文章目录⚠️ 一个至关重要的误区消费者幂等处理的标准流程常见的三种技术落地方案1. 数据库唯一索引 / 去重表最稳妥、适合金融/核心业务2. Redis 分布式锁 / 布隆过滤器适合高并发、非核心业务3. 业务状态机判断最轻量、最推荐结合使用在 RocketMQ 中因为网络抖动、消息重试或消费者重平衡Rebalance等原因消息重复投递是无法完全避免的。RocketMQ 保证的是“至少投递一次At Least Once”所以保证不因重复消息导致业务混乱的重任就落在了消费者身上也就是幂等消费。实现幂等的核心公式非常简单唯一业务标识 去重存储校验。⚠️ 一个至关重要的误区绝对不要使用 RocketMQ 的msgId作为幂等键因为在消息重试、或者生产者重复发送同一条业务数据时RocketMQ 生成的msgId可能会发生变化。实现幂等必须使用你业务自己的唯一标识如订单号orderId、支付流水号payId并在发送消息时将其设置为 Message 的keys。消费者幂等处理的标准流程在消费者拿到消息后内部应该遵循以下严格的执行顺序获取唯一业务标识:从 Message Key 中提取.从消费到的消息中提取出生产者埋入的业务唯一键例如msg.getKeys()获取订单号。防重校验去重判断:检查该业务是否已处理.去“去重存储介质”如 Redis 或数据库中查询这个业务标识。如果发现状态已经是“已处理”或“处理中”则直接返回CONSUME_SUCCESS丢弃该重复消息。执行核心业务逻辑:结合数据库事务或状态机.如果校验通过开始执行真正的业务改动。此时最好配合业务状态机例如只有当订单状态为待支付时才变更为已支付。如果是数据库操作确保在一个事务中完成。记录消费状态:持久化去重记录.业务成功后将该业务标识的状态更新为“已处理”比如在 Redis 里写入该 Key或者在数据库去重表中插入记录最后向 RocketMQ 回执成功。常见的三种技术落地方案根据你的业务并发量和对强一致性的要求通常有以下三种做法1. 数据库唯一索引 / 去重表最稳妥、适合金融/核心业务做法在数据库中建一张t_order_idempotent表将order_id设为唯一索引Unique Key。流程消费者收到消息后在同一个事务中先往去重表insert这条记录再执行业务 SQL。特点如果消息重复第二次insert会直接触发唯一键冲突异常DuplicateKeyException事务直接回滚天然幂等。2. Redis 分布式锁 / 布隆过滤器适合高并发、非核心业务做法利用 Redis 的SETNX命令或者 Redisson 锁。流程以业务 Key 作为 Redis 的键如lock:order:12345设置一个合理的过期时间如 5 分钟。如果SETNX返回成功说明是第一次处理开始洗数据如果返回失败说明正在处理或已处理完直接退出。特点性能极高但要防范 Redis 极端情况下宕机导致锁丢失的问题。3. 业务状态机判断最轻量、最推荐结合使用做法如果你的业务表本身就有完善的状态流转如1-待付款 - 2-已付款 - 3-已发货。流程更新状态的 SQL 直接写成带有前置条件的更新UPDATEt_orderSETstatus2WHEREid12345ANDstatus1;特点通过影响行数Rows Affected来判断是否更新成功。如果重复消息来临由于状态已经是2SQL 影响行数为 0直接判定为已处理即可。
消费者怎么做幂等消费??RocketMQ
发布时间:2026/5/20 15:32:34
文章目录⚠️ 一个至关重要的误区消费者幂等处理的标准流程常见的三种技术落地方案1. 数据库唯一索引 / 去重表最稳妥、适合金融/核心业务2. Redis 分布式锁 / 布隆过滤器适合高并发、非核心业务3. 业务状态机判断最轻量、最推荐结合使用在 RocketMQ 中因为网络抖动、消息重试或消费者重平衡Rebalance等原因消息重复投递是无法完全避免的。RocketMQ 保证的是“至少投递一次At Least Once”所以保证不因重复消息导致业务混乱的重任就落在了消费者身上也就是幂等消费。实现幂等的核心公式非常简单唯一业务标识 去重存储校验。⚠️ 一个至关重要的误区绝对不要使用 RocketMQ 的msgId作为幂等键因为在消息重试、或者生产者重复发送同一条业务数据时RocketMQ 生成的msgId可能会发生变化。实现幂等必须使用你业务自己的唯一标识如订单号orderId、支付流水号payId并在发送消息时将其设置为 Message 的keys。消费者幂等处理的标准流程在消费者拿到消息后内部应该遵循以下严格的执行顺序获取唯一业务标识:从 Message Key 中提取.从消费到的消息中提取出生产者埋入的业务唯一键例如msg.getKeys()获取订单号。防重校验去重判断:检查该业务是否已处理.去“去重存储介质”如 Redis 或数据库中查询这个业务标识。如果发现状态已经是“已处理”或“处理中”则直接返回CONSUME_SUCCESS丢弃该重复消息。执行核心业务逻辑:结合数据库事务或状态机.如果校验通过开始执行真正的业务改动。此时最好配合业务状态机例如只有当订单状态为待支付时才变更为已支付。如果是数据库操作确保在一个事务中完成。记录消费状态:持久化去重记录.业务成功后将该业务标识的状态更新为“已处理”比如在 Redis 里写入该 Key或者在数据库去重表中插入记录最后向 RocketMQ 回执成功。常见的三种技术落地方案根据你的业务并发量和对强一致性的要求通常有以下三种做法1. 数据库唯一索引 / 去重表最稳妥、适合金融/核心业务做法在数据库中建一张t_order_idempotent表将order_id设为唯一索引Unique Key。流程消费者收到消息后在同一个事务中先往去重表insert这条记录再执行业务 SQL。特点如果消息重复第二次insert会直接触发唯一键冲突异常DuplicateKeyException事务直接回滚天然幂等。2. Redis 分布式锁 / 布隆过滤器适合高并发、非核心业务做法利用 Redis 的SETNX命令或者 Redisson 锁。流程以业务 Key 作为 Redis 的键如lock:order:12345设置一个合理的过期时间如 5 分钟。如果SETNX返回成功说明是第一次处理开始洗数据如果返回失败说明正在处理或已处理完直接退出。特点性能极高但要防范 Redis 极端情况下宕机导致锁丢失的问题。3. 业务状态机判断最轻量、最推荐结合使用做法如果你的业务表本身就有完善的状态流转如1-待付款 - 2-已付款 - 3-已发货。流程更新状态的 SQL 直接写成带有前置条件的更新UPDATEt_orderSETstatus2WHEREid12345ANDstatus1;特点通过影响行数Rows Affected来判断是否更新成功。如果重复消息来临由于状态已经是2SQL 影响行数为 0直接判定为已处理即可。