RabbitMQ多线程消费与死信队列方案
延迟启动rabbitmq,动态停止或启动消费https://blog.csdn.net/duanyuanjun/article/details/128674859rabbitmq 发送消息跟踪https://blog.csdn.net/duanyuanjun/article/details/128664318动态创建Rabbitmq队列和交换器https://blog.csdn.net/duanyuanjun/article/details/128676268死信队列详细流程方便理解# 统计平台 RabbitMQ 多线程消费 死信队列 技术方案 ## 一、架构总览 生产者 RabbitMQ 消费者统计平台 ─────── ──────── ────────────── ┌─────────────────┐ publish ──────────────│ flink.exchange │ │ (topic) │ └───┬─────────┬───┘ │ │ flink.msg flink.msg.dead │ │ ▼ ▼ ┌──────────────────┐ ┌─────────────────────┐ │ 主队列 │ │ 死信 Exchange │ │ statistical.lite │ │ flink.exchange.dead │ │ .aggregator │ │ (direct) │ │ │ └──────────┬──────────┘ │ x-max-length │ │ │ 100000 │ flink.msg.dead │ x-dead-letter- │ │ │ exchange │ ▼ │ flink.exchange ├─ ┌─────────────────────┐ │ .dead │ │ 死信队列 │ │ x-dead-letter- │ │ statistical.lite │ │ routing-key │ │ .aggregator.dead │ │ flink.msg.dead │ │ x-message-ttl24h │ └───────┬──────────┘ │ x-max-length50000 │ │ └──────────┬──────────┘ ▼ │ ┌──────────────┐ 死信消费者可选 │ 多线程消费 │ 记录日志/告警/重放 │ 3~10 并发 │ │ prefetch250 │ └──────────────┘ --- ## 二、核心代码 ### 2.1 容器工厂配置 — RabbitMQConfig.java **文件路径**com.statistical.config.RabbitMQConfig **职责**自定义 SimpleRabbitListenerContainerFactory覆盖 Spring Boot 默认工厂实现并发消费。 java Slf4j Configuration ConditionalOnBean(AggregatorManager.class) public class RabbitMQConfig { Value(${statistical.aggregator.listener.min-concurrency:3}) private int minConcurrency; Value(${statistical.aggregator.listener.max-concurrency:10}) private int maxConcurrency; Value(${statistical.aggregator.listener.prefetch:250}) private int prefetch; Bean(rabbitListenerContainerFactory) public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(minConcurrency); factory.setMaxConcurrentConsumers(maxConcurrency); factory.setPrefetchCount(prefetch); factory.setConsumerTagStrategy( queue - lite-consumer- queue - System.currentTimeMillis()); log.info([RabbitMQ-Config] 并发消费容器工厂初始化, minConcurrency{}, maxConcurrency{}, prefetch{}, minConcurrency, maxConcurrency, prefetch); return factory; } } ### 2.2 消息监听器 — StatisticalMessageListener.java **文件路径**com.statistical.lite.listener.StatisticalMessageListener **职责**统一 MQ 消费入口通过 RabbitListener 实现多线程消费。 java Slf4j Component ConditionalOnBean(AggregatorManager.class) public class StatisticalMessageListener { Autowired private AggregatorManager aggregatorManager; RabbitListener( concurrency ${statistical.aggregator.listener.min-concurrency:3} -${statistical.aggregator.listener.max-concurrency:10}, bindings QueueBinding( value Queue( name ${statistical.aggregator.queue-name:statistical.lite.aggregator}, durable true), exchange Exchange( name ${statistical.aggregator.exchange-name:flink.exchange}, type topic), key ${statistical.aggregator.routing-key:flink.msg} ) ) public void onMessage(String message) { try { MqMessageWrapper.MessageWrapper vo JSONObject.parseObject(message, MqMessageWrapper.MessageWrapper.class); aggregatorManager.submit(vo.getData()); } catch (Exception e) { log.error([Lite-Listener] 消息解析失败已丢弃: {}, message, e); } } } --- ## 三、配置参数详解 — application.yml yaml statistical: aggregator: # ---- RabbitMQ 队列绑定 ---- queue-name: statistical.lite.aggregator # 主队列 exchange-name: flink.exchange # 上游 Exchange routing-key: flink.msg # 上游 Routing Key # ---- RabbitMQ 并发消费配置 ---- listener: min-concurrency: 3 # 最小并发消费者数 max-concurrency: 10 # 最大并发消费者数弹性伸缩上限 prefetch: 250 # 每个消费者预取数 # ---- 死信队列配置待落地 ---- dlx-exchange: flink.exchange.dead # 死信 Exchange dlx-routing-key: flink.msg.dead # 死信 Routing Key dlx-queue: statistical.lite.aggregator.dead # 死信队列名 queue-max-length: 100000 # 主队列最大长度 dlx-ttl-ms: 86400000 # 死信队列消息 TTL24h dlx-max-length: 50000 # 死信队列最大长度 ### 参数选择参考 | 参数 | 1C1G | 2C2G | 4C4G | 说明 | |------|------|------|------|------| | min-concurrency | 2 | 3 | 5 | 1C 上线程多反而因上下文切换降低吞吐 | | max-concurrency | 5 | 10 | 20 | 弹性伸缩上限按 CPU 核数调整 | | prefetch | 250 | 250 | 250 | submit() 极轻量大批预取提升吞吐 | | queue-max-length | 100000 | 100000 | 100000 | 防止队列无限增长拖垮 RabbitMQ | --- ## 四、并发安全保证 多线程消费后多个消费者线程并发调用 aggregatorManager.submit()各组件的线程安全性 | 组件 | 线程安全机制 | 说明 | |------|-------------|------| | **GuavaAggregatorManager** | Cache.asMap().compute() | 原子操作同一 key 串行执行不同 key 并发安全 | | **DisruptorAggregatorManager** | ProducerType.MULTI RingBuffer CAS | 多生产者安全发布单 EventHandler 串行聚合 | | **TenantAlgoDayAccumulator** | LongAdder synchronized | count/sum 使用 LongAdder 无锁累加max/min 使用 synchronized 保证原子 | | **FlushScheduler** | ReentrantLock volatile | storageLock/prometheusLock 双锁分离volatile 防重入 | | **LiteMysqlSinkService** | ON DUPLICATE KEY UPDATE | 天然幂等重复写入不影响正确性 | --- ## 五、死信队列方案待落地 ### 5.1 为什么需要死信队列 | 场景 | 无死信队列 | 有死信队列 | |------|-----------|-----------| | 消费不过来队列堆积 | 队列无限增长 → RabbitMQ 内存溢出 → 流控阻塞生产者 | 队列达上限后最老消息转入死信队列主队列长度可控 | | 需要排查丢失消息 | 无法追踪 | 死信队列保留溢出消息带 x-death Header 可溯源 | | 极端情况 | RabbitMQ 宕机 | 仅死信队列增长RabbitMQ 内存可控 | ### 5.2 死信触发条件 | 条件 | 说明 | |------|------| | 消息被 basic.reject/basic.nack 且 requeuefalse | 消费者主动拒绝 | | 消息 TTL 过期x-message-ttl | 消息超时 | | 队列达到最大长度x-max-length | **本项目主要场景**最老消息被挤出 | ### 5.3 死信消息附加 Header 消息变为死信后RabbitMQ 自动添加 x-death Header | 字段 | 值示例 | 含义 | |------|--------|------| | x-death[0].reason | maxlen | 死因maxlen / rejected / expired | | x-death[0].queue | statistical.lite.aggregator | 原始队列名 | | x-death[0].exchange | flink.exchange | 原始 Exchange | | x-death[0].routing-keys | [flink.msg] | 原始 Routing Key | | x-death[0].time | 2026-05-14T10:30:0008:00 | 变为死信的时间 | | x-death[0].count | 1 | 被死信路由的次数 | ### 5.4 待落地的代码改造 主队列增加 arguments java RabbitListener( concurrency ${statistical.aggregator.listener.min-concurrency:3} -${statistical.aggregator.listener.max-concurrency:10}, bindings { // 主队列绑定 QueueBinding( value Queue( name ${statistical.aggregator.queue-name:statistical.lite.aggregator}, durable true, arguments { Argument(name x-max-length, value ${statistical.aggregator.queue-max-length:100000}, type java.lang.Integer), Argument(name x-dead-letter-exchange, value ${statistical.aggregator.dlx-exchange:flink.exchange.dead}), Argument(name x-dead-letter-routing-key, value ${statistical.aggregator.dlx-routing-key:flink.msg.dead}) } ), exchange Exchange( name ${statistical.aggregator.exchange-name:flink.exchange}, type topic), key ${statistical.aggregator.routing-key:flink.msg} ), // 死信 Exchange 死信队列绑定 QueueBinding( value Queue( name ${statistical.aggregator.dlx-queue:statistical.lite.aggregator.dead}, durable true, arguments { Argument(name x-message-ttl, value ${statistical.aggregator.dlx-ttl-ms:86400000}, type java.lang.Integer), Argument(name x-max-length, value ${statistical.aggregator.dlx-max-length:50000}, type java.lang.Integer) } ), exchange Exchange( name ${statistical.aggregator.dlx-exchange:flink.exchange.dead}, type direct), key ${statistical.aggregator.dlx-routing-key:flink.msg.dead} ) } ) **注意**RabbitMQ 中已存在的队列不支持修改 arguments。落地时需先在管理端删除旧队列再启动应用让新声明生效。生产环境建议在 RabbitMQ 管理端预创建队列。 ### 5.5 死信队列处理方式 | 方式 | 适用场景 | 操作 | |------|---------|------| | 人工排查 | 偶发溢出 | RabbitMQ Management 查看定位原因 | | 独立消费者 | 需自动化 | 为死信队列写独立 RabbitListener记录日志/告警 | | 重放回主队列 | 需重新消费 | 死信消费者将消息重新 publish 到原 Exchange注意避免再溢出循环 | | 定时清理 | 数据量大 | 死信队列配 x-message-ttl x-expires 自动清理 | --- ## 六、容量评估1C1G 基准 ### 6.1 消息处理耗时分解Guava 模式 | 步骤 | 操作 | 预估耗时 | |------|------|---------| | JSON 反序列化 | JSONObject.parseObject() | ~0.05-0.15ms | | 日期格式化 | KeyUtils.todayStartTime() | ~0.01ms | | 6× Cache.compute() | ConcurrentHashMap 原子 LongAdder 累加 | ~0.06-0.12ms | | synchronized max/min | updateMaxMin() | ~0.005ms | | **合计** | | **~0.15-0.3ms/条** | ### 6.2 吞吐量预估 | 模式 | 1C1G 持续 | 1C1G 突发 | 2C2G 持续 | 4C4G 持续 | |------|----------|----------|----------|----------| | Guava | 1,000-2,000 TPS | ~3,000 TPS | 3,000-5,000 TPS | 5,000-8,000 TPS | | Disruptor | 3,000-5,000 TPS | ~8,000 TPS | 5,000-10,000 TPS | 10,000-20,000 TPS | ### 6.3 消费不过来时的连锁反应 阶段1队列堆积无感知 → 消息延迟增大但数据不丢 阶段2RabbitMQ 内存告警 → 超过 vm_memory_high_watermark默认 40% → 阻塞生产者连接流控 → 内存回到安全水位后自动恢复 阶段3极端情况 → 无上限队列 → RabbitMQ 磁盘满 → 宕机 → 加了死信队列后 → 主队列长度锁死在 max-length → 安全 --- ## 七、关键注意事项 | 事项 | 说明 | |------|------| | **队列已存在时 arguments 不可变** | 修改队列参数前必须先删除旧队列否则应用启动报错 | | **x-dead-letter-routing-key 必须设** | 不设则死信用原 routing key 路由会回到主队列造成死循环 | | **x-max-length 计数包含 unacked** | 10 消费者 × 250 prefetch 2,500 在途消息也占额度 | | **ConditionalOnBean 保证条件生效** | statistical.aggregator.modeoff 时不创建工厂和监听器 | | **水平扩容无需改代码** | 多实例消费同一队列RabbitMQ 自动轮询分发 | | **prefetch250 是 Spring Boot 默认值** | 本项目 submit() 极轻量250 合理如消费者处理慢可降低 |