我们是由枫哥组建的IT技术团队成立于2017年致力于帮助IT从业者提供实力成功入职理想企业我们提供一对一学习辅导由知名大厂导师指导分享Java技术、参与项目实战等服务并为学员定制职业规划全面提升竞争力过去8年我们已成功帮助数千名求职者拿到满意的OfferIT枫斗者、IT枫斗者-Java面试突击。RabbitMQ 从入门到生产秒杀场景的可靠性实战一、为什么秒杀必须用 MQ秒杀场景的典型特征流量脉冲式激增、库存有限、一致性要求高。直接打穿到数据库会导致连接池耗尽、行锁竞争最终服务雪崩。引入 RabbitMQ 实现异步削峰能力作用效果异步提速Redis 扣减后直接返回后续操作异步处理接口 RT 从 3s → 50ms削峰填谷1万 QPS 峰值缓冲到队列消费端按 1000/s 匀速处理保护 MySQL服务解耦下单主流程不依赖短信/积分服务下游故障不影响主链路二、可靠性四重保障消息从生产到消费经历四个关键节点每个节点都可能丢消息生产者 ──► Exchange ──► Queue ──► 消费者 │ │ │ │ ▼ ▼ ▼ ▼ Confirm Return 持久化 手动ACK (到Broker) (到队列) (防重启) (业务成功)四层兜底策略层级风险解决方案生产者→Broker网络抖动导致消息未到达Confirm 机制 本地消息表Exchange→Queue路由失败交换机/队列绑定错误Return 回调 补偿回滚Queue→消费者消费失败或进程崩溃手动 ACK有限重试最终失败重试耗尽仍失败死信队列(DLQ)补偿服务三、核心代码实现3.1 秒杀入口Redis 预检 异步投递ServiceRequiredArgsConstructorpublicclassSeckillService{privatefinalStringRedisTemplateredisTemplate;privatefinalRabbitTemplaterabbitTemplate;privatefinalSeckillCompensationServicecompensationService;privatefinalRedisIdWorkeridWorker;// Lua 脚本原子校验库存 防重privatestaticfinalDefaultRedisScriptLongSECKILL_SCRIPTnewDefaultRedisScript();static{SECKILL_SCRIPT.setScriptText( if (redis.call(get, KEYS[1]) 0) then return 1; -- 库存不足 end if (redis.call(sismember, KEYS[2], ARGV[1]) 1) then return 2; -- 重复下单 end redis.call(decr, KEYS[1]); redis.call(sadd, KEYS[2], ARGV[1]); return 0; );SECKILL_SCRIPT.setResultType(Long.class);}publicResultLongseckill(LongvoucherId){LonguserIdUserContext.getUserId();longorderIdidWorker.nextId(order);// 1. Redis 原子预检LongresultredisTemplate.execute(SECKILL_SCRIPT,List.of(seckill:stock:voucherId,seckill:order:voucherId),userId.toString());if(result1)returnResult.fail(库存不足);if(result2)returnResult.fail(请勿重复下单);// 2. 构造订单消息VoucherOrderordernewVoucherOrder(orderId,userId,voucherId);// 3. 投递 MQ异步落库try{rabbitTemplate.convertAndSend(MqConfig.SECKILL_EXCHANGE,MqConfig.SECKILL_KEY,order,message-{// 携带业务 ID用于 Confirm 回调追踪message.getMessageProperties().setCorrelationId(orderId.toString());returnmessage;});returnResult.ok(orderId);// 立即返回用户看到排队中}catch(Exceptione){// 4. 投递失败必须补偿否则 Redis 已扣减但 DB 无订单log.error(MQ投递失败, orderId{}, 触发补偿,orderId,e);compensationService.rollback(voucherId,userId,MQ_SEND_FAIL);returnResult.fail(系统繁忙请稍后重试);}}}3.2 MQ 配置死信队列 手动 ACKConfigurationpublicclassMqConfig{// 主链路publicstaticfinalStringSECKILL_EXCHANGEseckill.topic;publicstaticfinalStringSECKILL_QUEUEseckill.queue;publicstaticfinalStringSECKILL_KEYseckill.order;// 死信链路publicstaticfinalStringDLX_EXCHANGEseckill.dlx.topic;publicstaticfinalStringDLX_QUEUEseckill.dlx.queue;publicstaticfinalStringDLX_KEYseckill.dlx;BeanpublicQueueseckillQueue(){MapString,ObjectargsnewHashMap();// 绑定死信交换机消息被 NACK 且 requeuefalse 时进入 DLQargs.put(x-dead-letter-exchange,DLX_EXCHANGE);args.put(x-dead-letter-routing-key,DLX_KEY);// 队列持久化durabletruereturnnewQueue(SECKILL_QUEUE,true,false,false,args);}BeanpublicSimpleRabbitListenerContainerFactorymanualAckFactory(ConnectionFactorycf){SimpleRabbitListenerContainerFactoryfactorynewSimpleRabbitListenerContainerFactory();factory.setConnectionFactory(cf);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动 ACKfactory.setPrefetchCount(1);// 公平分发避免单消费者积压returnfactory;}// Exchange、DLQ 等 Bean 省略...}3.3 消费者手动 ACK 有限重试ComponentSlf4jRequiredArgsConstructorpublicclassSeckillConsumer{privatefinalVoucherOrderServiceorderService;privatefinalRedissonClientredisson;privatefinalRabbitTemplaterabbitTemplate;RabbitListener(queuesMqConfig.SECKILL_QUEUE,containerFactorymanualAckFactory)publicvoidconsume(VoucherOrderorder,Channelchannel,Messagemsg)throwsIOException{longdeliveryTagmsg.getMessageProperties().getDeliveryTag();// 1. 用户维度分布式锁防并发重复消费RLocklockredisson.getLock(seckill:consume:order.getUserId());if(!lock.tryLock(3,10,TimeUnit.SECONDS)){// 获取锁失败ACK 丢弃其他节点在处理channel.basicAck(deliveryTag,false);return;}try{// 2. 幂等校验DB 是否已存在if(orderService.exists(order.getId())){channel.basicAck(deliveryTag,false);return;}// 3. 执行业务事务方法创建订单 扣减库存orderService.createOrder(order);// 4. 成功 ACKchannel.basicAck(deliveryTag,false);}catch(Exceptione){log.error(订单处理失败, orderId{}, 准备重试,order.getId(),e);handleRetry(order,msg,channel,deliveryTag);}finally{lock.unlock();}}privatevoidhandleRetry(VoucherOrderorder,Messagemsg,Channelchannel,longtag)throwsIOException{Integerretry(Integer)msg.getMessageProperties().getHeaders().getOrDefault(x-retry,0);if(retry3){// 重新投递重试次数 1rabbitTemplate.convertAndSend(MqConfig.SECKILL_EXCHANGE,MqConfig.SECKILL_KEY,order,m-{m.getMessageProperties().setHeader(x-retry,retry1);returnm;});// ACK 原消息避免重复channel.basicAck(tag,false);}else{// 超过上限NACK 进入死信队列log.error(重试耗尽, orderId{} 进入 DLQ,order.getId());channel.basicNack(tag,false,false);}}}3.4 生产者可靠性Confirm ReturnComponentSlf4jRequiredArgsConstructorpublicclassMqProducerConfig{privatefinalSeckillCompensationServicecompensationService;privatefinalObjectMapperobjectMapper;PostConstructpublicvoidinit(){RabbitTemplatetemplaterabbitTemplate();// 1. Confirm 回调Broker 是否收到消息template.setConfirmCallback((correlationData,ack,cause)-{StringidcorrelationData!null?correlationData.getId():unknown;if(!ack){log.error(Confirm NACK, correlationId{}, cause{},id,cause);// 实际生产写入本地消息表由定时任务重发}});// 2. Return 回调消息到达 Exchange 但无法路由到 Queuetemplate.setReturnsCallback(returned-{log.error(消息不可路由: exchange{}, routingKey{}, replyCode{},returned.getExchange(),returned.getRoutingKey(),returned.getReplyCode());// 解析消息触发补偿try{VoucherOrderorderobjectMapper.readValue(returned.getMessage().getBody(),VoucherOrder.class);compensationService.rollback(order.getVoucherId(),order.getUserId(),ROUTING_FAIL);}catch(Exceptione){log.error(Return 回调处理异常,e);}});// 必须开启 mandatoryReturn 回调才生效template.setMandatory(true);}BeanpublicRabbitTemplaterabbitTemplate(){// 配置 ConnectionFactory 等...}}3.5 死信消费 补偿服务最终一致性ComponentSlf4jRequiredArgsConstructorpublicclassDlxConsumer{privatefinalVoucherOrderServiceorderService;privatefinalSeckillCompensationServicecompensationService;RabbitListener(queuesMqConfig.DLX_QUEUE,containerFactorymanualAckFactory)publicvoidhandleDlx(Messagemsg,Channelchannel)throwsIOException{longtagmsg.getMessageProperties().getDeliveryTag();try{VoucherOrderorderparse(msg);// 1. 再次检查是否已落库幂等if(!orderService.exists(order.getId())){// 2. 未落库则回滚 Redis释放库存和下单标记compensationService.rollback(order.getVoucherId(),order.getUserId(),DLX_COMPENSATE);}channel.basicAck(tag,false);}catch(Exceptione){// DLQ 处理再失败直接 ACK 避免死循环记录日志人工介入log.error(DLQ 处理失败, 直接丢弃: {},newString(msg.getBody()),e);channel.basicAck(tag,false);}}}ServiceRequiredArgsConstructorpublicclassSeckillCompensationService{privatefinalStringRedisTemplateredisTemplate;privatefinalSeckillVoucherServicevoucherService;Transactionalpublicvoidrollback(LongvoucherId,LonguserId,Stringreason){// 1. 移除已下单标记redisTemplate.opsForSet().remove(seckill:order:voucherId,userId.toString());// 2. 校准库存以 DB 为准避免 Redis 超卖SeckillVouchervouchervoucherService.getById(voucherId);intstockvoucher!null?voucher.getStock():0;redisTemplate.opsForValue().set(seckill:stock:voucherId,String.valueOf(stock));log.warn(补偿回滚完成: voucherId{}, userId{}, reason{},voucherId,userId,reason);}}四、关键设计决策4.1 为什么不用自动 ACK模式风险适用场景自动 ACK消息刚投递就 ACK业务异常时消息已丢失可容忍丢失的日志场景手动 ACK业务成功后 ACK失败可重试或进死信金融/订单等强一致场景4.2 重试策略选择策略对比 ├── 立即重试Requeuetrue │ └── 问题阻塞队列失败消息反复冲击消费者 ├── 延迟重试RetryTemplate 延迟队列 │ └── 优点指数退避避免雪崩 │ └── 缺点架构复杂需死信 TTL 延迟插件 └── 有限重试 DLQ本文方案 └── 优点简单可控最终一致性兜底 └── 缺点重试间隔短立即重投秒杀场景选择有限重试 DLQ瞬时流量高延迟重试会加剧队列堆积快速失败并补偿更符合业务预期。4.3 Redis 与 DB 一致性保障正常流程 Redis 预扣 ──► MQ 投递 ──► 消费落库 异常场景 1. Redis 扣减成功MQ 投递失败 └── 立即补偿回滚代码中 try-catch 2. MQ 投递成功消费失败重试3次 └── 进 DLQ补偿回滚 3. 消费成功ACK 失败网络抖动 └── 消息重新投递幂等校验拦截五、生产 checklist检查项说明队列持久化durabletrue重启后队列不丢失消息持久化deliveryMode2配合 Confirm 确保落盘消费者限流prefetchCount1避免单消费者内存溢出死信监控对 DLQ 长度设置告警及时人工介入幂等设计订单 ID 唯一索引防重复落库超时设置消费端超时 MQ 消息 TTL避免消息过期六、总结可靠的消息投递不是配置参数而是全链路的工程实践生产者Confirm 确保到达 BrokerReturn 捕获路由失败失败即补偿Broker队列持久化 死信绑定做最后一道防线消费者手动 ACK 有限重试 幂等校验业务成功才确认最终兜底死信队列 补偿服务保证数据最终一致⭐️推荐:Offer训练营介绍Java 面试 后端通用面试八股文Java后端企业级实战面试Java后端校招算法学习