个人主页北极的代码欢迎来访作者简介java后端学习者❄️个人专栏苍穹外卖日记SSM框架深入JavaWeb✨命运的结局尽可永在不屈的挑战却不可须臾或缺提要我们这里是基于Redis实现的消息队列还有更专业的消息队列如RabbitMQ等等后面我们专门学习。摘要本文介绍了Redis在秒杀场景下实现消息队列的几种方案对比重点分析了Stream消费者组模式的优势。主要内容包括核心方案对比阻塞队列单机版简单但不可靠Redis Stream分布式版支持ACK确认、消息重试和消费者组Stream核心机制消息持久化存储消费者组负载均衡Pending列表处理未确认消息XCLAIM实现消息重试秒杀场景应用生产者用XADD写入订单消息消费者组通过XREADGROUP获取消息处理成功后发送XACK确认未确认消息可由其他消费者接管优势总结可靠性消息不丢失扩展性支持多消费者并行处理容错性自动重试失败消息Redis Stream为秒杀系统提供了专业级的异步处理能力相比阻塞队列更适合生产环境。在秒杀场景中我们之前用阻塞队列实现了异步下单它就像餐厅里连接前台和后厨的订单挂钩虽然管用但这个挂钩只存在于这个餐厅单个JVM进程内部。而Redis 消息队列则是一个独立于任何餐厅的、全国统一的快递系统。我们可以把消息订单交给它它能可靠地将消息送达给任何一个或多个收件人消费者服务。一、核心价值为什么需要消息队列回顾我们之前用BlockingQueue的方案它解决了异步的问题但也带来了新的挑战特性阻塞队列 (BlockingQueue)Redis 消息队列 (List/Stream)数据持久化❌ 服务重启队列里未处理的订单会丢失。✅ 消息存储在Redis内存中并支持持久化到硬盘重启不丢失。跨服通信❌ 只能在单个JVM内部的不同线程间传递。✅ 可以在多个独立的微服务实例间传递是分布式系统的“信使”。消费模式单线程阻塞获取 (take())。支持多种模式一个消息被一个消费者处理点对点或被多个消费者处理发布/订阅。功能边界极简只具备基础的“存”和“取”。功能强大提供消息确认(ACK)、消费者组等完备机制。简单来说从BlockingQueue升级到 Redis 消息队列你的异步处理能力就从单机版升级到了网络版。二、Redis 的消息队列工具箱Redis 提供了多种数据结构来实现消息队列可以满足不同场景的需求。1. 基础版List 作为消息队列这是最简单的实现方式利用LPUSH生产消息BRPOP消费消息。生产端秒杀成功后将订单信息推入队列。消费端一个独立的线程使用BRPOP命令阻塞地等待新消息。这种方式虽然简单但有明显缺陷消息一旦被BRPOP取出就从队列中移除了。如果消费者在处理时崩溃这条消息就会永久丢失缺乏ACK机制。bash# 生产者将消息放入队列头部 LPUSH order_queue {orderId: 123, userId: 1} # 消费者从队列尾部阻塞式取出消息 BRPOP order_queue 0 1) order_queue 2) {orderId: 123, userId: 1} # 消息被取出队列中已没有这条消息了①、List 作为消息队列的核心操作操作命令说明生产消息LPUSH key value从队列左侧放入消息消费消息RPOP key从队列右侧取出消息非阻塞阻塞消费BRPOP key timeout队列为空时阻塞等待直到有新消息查看但不取出LRANGE key 0 -1查看队列所有消息典型秒杀用法java // 生产者Tomcat线程秒杀成功后 stringRedisTemplate.opsForList().leftPush(seckill:queue, orderJson); // 消费者独立线程循环处理 while (true) { String orderJson stringRedisTemplate.opsForList().rightPop(seckill:queue, 0, TimeUnit.SECONDS); // 处理订单... }②、相比阻塞队列的优点对比项阻塞队列 (BlockingQueue)Redis List跨服务共享❌ 只能单个JVM内✅ 多个服务实例共享数据持久化❌ 重启丢失✅ Redis 持久化RDB/AOF消息回溯❌ 取出即消失✅ 可用LRANGE查看历史多消费者竞争❌ 需要自己实现✅ 多个消费者BRPOP天然竞争单点故障❌ 服务重启消息全丢✅ 主从/集群高可用③、List 相比 Stream 的优点虽然 Stream 功能更强大但 List 在某些场景下仍有优势对比项ListStream简单性✅ 命令少易理解❌ 命令多学习成本高性能✅ 极高纯内存操作略低需要生成消息IDRedis版本要求✅ 所有版本都支持❌ 需要 Redis 5.0消息确认(ACK)❌ 不支持✅ 支持消费者组❌ 不支持✅ 支持一句话List 简单、快、兼容性好适合对消息可靠性要求不高的场景。Stream 功能全适合需要可靠消息的场景。④、List 在秒杀场景的适用性场景是否可用建议秒杀核心链路⚠️ 可用但有风险建议用 Stream 或专业 MQ非核心通知日志、统计✅ 完全够用List 简单高效开发测试、Demo✅ 首选配置简单快速验证List 的主要风险消费者取走消息后如果崩溃消息丢失无 ACK没有消费者组扩展性受限2.Pub/Sub 的核心模式Pub/Sub发布/订阅是 Redis 提供的一对多消息广播模式。发布者往频道发一条消息所有订阅了该频道的消费者都能同时收到。①、核心命令命令作用示例SUBSCRIBE channel订阅频道SUBSCRIBE seckill:orderPSUBSCRIBE pattern订阅匹配模式的频道PSUBSCRIBE seckill:*PUBLISH channel message发布消息到频道PUBLISH seckill:order {userId:1}UNSUBSCRIBE channel取消订阅UNSUBSCRIBE seckill:ordertext发布者 (Publisher) │ │ 发布消息 秒杀成功 ↓ ┌─────────────────────────────────────────┐ │ Redis Pub/Sub │ │ (消息广播中心) │ └─────────────────────────────────────────┘ │ │ │ │ 订阅频道 │ 订阅频道 │ 订阅频道 ↓ ↓ ↓ ┌───────┐ ┌───────┐ ┌───────┐ │消费者A│ │消费者B│ │消费者C│ └───────┘ └───────┘ └───────┘ (都收到相同消息)一句话发布者发一条消息所有订阅了该频道的消费者都能同时收到。答案是相比 JVM 内的阻塞队列Redis List 最大的优势是跨服务共享。②、Pub/Sub 和 Stream 的核心区别特性Pub/Sub (广播)Stream (队列)阻塞队列消息派发一对多广播一对一组内竞争一对一消息存储❌ 不存储发完即忘✅ 持久化存储❌ 内存重启丢失消息确认(ACK)❌ 不支持✅ 支持❌ 不支持历史消息❌ 订阅后只能收到新消息✅ 可回溯历史❌ 不支持消费者离线❌ 离线期间的消息全丢失✅ 重新上线可继续消费❌ 离线消息丢失性能极高无存储开销高极高③、Pub/Sub 的致命缺陷1. 消息不持久化发完即忘bash# 如果没有消费者在监听这条消息直接消失 PUBLISH seckill_channel {orderId: 123} (integer) 0 # 0表示没有消费者收到问题如果消费者服务重启、网络抖动、或者处理慢了消息就永远丢失了。2. 消费者离线期间的消息全部丢失text时间线 10:00:00 — 消费者A 正常运行 10:00:01 — 发布者发布消息 M1 → 消费者A收到 ✅ 10:00:02 — 消费者A 重启断开连接 10:00:03 — 发布者发布消息 M2 → 消费者A不在线M2 丢失 ❌ 10:00:04 — 消费者A 重启完成重新连接 10:00:05 — 发布者发布消息 M3 → 消费者A收到 ✅结果M2 永远丢失用户付了钱但订单没创建。3. 没有消息确认机制ACKjava // Pub/Sub 消费端 pubSub.subscribe(channel, (message) - { // 如果这里抛异常消息就没了 createOrder(message); // ← 假设这里数据库超时 // 消息已经被 Redis 标记为已发送不会重试 });问题消费者收到消息后无论处理成功还是失败Redis 都不管。无法重试无法补偿。4. 无法回溯历史Pub/Sub 只能收到订阅之后的消息。如果你想知道“过去1小时有哪些秒杀订单”Pub/Sub 给不了你。方案能防消息丢失能重试适合秒杀吗原因阻塞队列❌❌❌单机、重启丢失Pub/Sub❌❌❌离线丢消息、无ACKStream✅✅✅持久化、ACK、消费者组结论Pub/Sub 不适合做秒杀的核心消息队列。④Pub/Sub 适合什么场景虽然不适合秒杀但 Pub/Sub 有自己的用武之地1. 实时通知、广播java // 系统公告 redisTemplate.convertAndSend(system:notice, 服务器将于今日22:00升级); // 所有在线的管理后台都能收到实时通知2. 分布式缓存同步java // 服务A 更新了数据 redisTemplate.convertAndSend(cache:refresh, product:123); // 服务B、服务C 收到后主动刷新本地缓存3. Redisson 分布式锁的唤醒机制Redisson 的tryLock(waitTime)就是用 Pub/Sub 实现的等待锁的线程订阅一个频道锁释放时发布一条消息唤醒所有等待线程。⑤、总结问题答案Pub/Sub 能用来做秒杀异步下单吗不能会丢消息为什么不能不持久化、无ACK、离线丢消息那它适合做什么实时通知、广播、缓存同步、事件触发秒杀应该用哪个Redis Stream推荐或 专业MQRocketMQ/Kafka一句话Pub/Sub 是广播电台消息喊一嗓子就没了听到的人收到没听到的永远错过秒杀需要快递系统消息要可靠送达、签收确认、丢了要重发。Stream 就是后者。3. 进阶版Stream 作为消息队列 (推荐)Stream 是 Redis 5.0 引入的数据结构专门用于实现消息队列。下面我把最常用的命令按照使用流程整理出来。①、基础命令总览命令作用类比XADD添加消息到 StreamINSERTXREAD读取消息SELECTXRANGE按范围读取正序SELECT ORDER BYXREVRANGE按范围读取倒序SELECT ORDER BY DESCXLEN获取消息长度COUNTXTRIM裁剪消息限制长度清理旧数据XDEL删除消息DELETEXINFO查看 Stream 信息SHOW STATUS②、生产端命令发消息2.1 XADD - 添加消息bash# 基本格式 XADD stream_name [MAXLEN ~ count] * field1 value1 [field2 value2 ...] # 示例1最简单的添加 XADD seckill:stream * userId 1001 voucherId 2001 1680000000000-0 # 返回消息ID时间戳-序号 # 示例2限制队列长度近似裁剪性能更好 XADD seckill:stream MAXLEN ~ 10000 * userId 1002 voucherId 2002 1680000000001-0 # 示例3不限制长度 XADD seckill:stream * userId 1003 voucherId 2003 1680000000002-0消息ID格式毫秒时间戳-序号如1680000000000-0时间戳Redis 服务器本地时间序号同一毫秒内的消息序号从0开始2.2 XTRIM - 限制队列长度bash# 格式 XTRIM stream_name MAXLEN [~] count # 示例保留最近 5000 条消息 XTRIM seckill:stream MAXLEN 5000 (integer) 0 # 被删除的消息数量 # 示例近似裁剪性能更好 XTRIM seckill:stream MAXLEN ~ 5000 (integer) 0③、消费端命令读消息3.1 XREAD - 读取消息bash# 格式 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key1 key2 ... id1 id2 ... # 示例1读取 2 条新消息从消息ID 0-0 开始 XREAD COUNT 2 STREAMS seckill:stream 0-0 1) 1) seckill:stream 2) 1) 1) 1680000000000-0 2) 1) userId 2) 1001 3) voucherId 4) 2001 2) 1) 1680000000001-0 2) 1) userId 2) 1002 3) voucherId 4) 2002 # 示例2永久阻塞读取等待新消息 XREAD COUNT 1 BLOCK 0 STREAMS seckill:stream $ # $ 表示只读新消息BLOCK 0 表示永久阻塞直到有新消息 # 示例3阻塞 5 秒读取 XREAD COUNT 1 BLOCK 5000 STREAMS seckill:stream $ # 5秒内没有新消息就返回 nil3.2 XRANGE / XREVRANGE - 范围查询bash# 格式 XRANGE stream_name start end [COUNT count] # 示例1查询所有消息 XRANGE seckill:stream - 1) 1) 1680000000000-0 2) 1) userId 2) 1001 3) ... # 示例2查询指定时间范围的消息 XRANGE seckill:stream 1680000000000 1680000001000 # 示例3查询最近 10 条消息倒序 XREVRANGE seckill:stream - COUNT 10④、消费者组命令核心4.1 XGROUP - 创建/管理消费者组bash# 格式 XGROUP [CREATE|DESTROY|DELCONSUMER|SETID] key groupname [id|$] [MKSTREAM] # 示例1创建消费者组从消息ID 0 开始即从头消费 XGROUP CREATE seckill:stream order-group 0 MKSTREAM OK # 示例2创建消费者组只消费新消息 XGROUP CREATE seckill:stream order-group $ MKSTREAM OK # 示例3删除消费者组 XGROUP DESTROY seckill:stream order-group (integer) 1 # 示例4删除消费者组中的某个消费者 XGROUP DELCONSUMER seckill:stream order-group consumer-1 (integer) 1 # 示例5重置消费者组的位置重新从某ID开始消费 XGROUP SETID seckill:stream order-group 0 OK4.2 XREADGROUP - 消费者组内读取bash# 格式 XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK ms] STREAMS key1 key2 ... id1 id2 ... # 示例1消费者组读取0 表示未读消息 XREADGROUP GROUP order-group consumer-1 COUNT 1 STREAMS seckill:stream 1) 1) seckill:stream 2) 1) 1) 1680000000000-0 2) 1) userId 2) 1001 # 示例2阻塞读取 XREADGROUP GROUP order-group consumer-1 COUNT 1 BLOCK 0 STREAMS seckill:stream # 示例3读取特定消息用于重试 XREADGROUP GROUP order-group consumer-1 STREAMS seckill:stream 1680000000000-0注意是一个特殊ID表示只读取从未被任何消费者读取过的新消息。4.3 XACK - 确认消息已处理bash# 格式 XACK stream_name groupname id [id ...] # 示例确认消息已处理 XACK seckill:stream order-group 1680000000000-0 (integer) 14.4 XPENDING - 查看待确认消息bash# 格式 XPENDING stream_name groupname [start end count] [consumer] # 示例1查看所有待确认消息 XPENDING seckill:stream order-group 1) (integer) 3 # 3条未确认消息 2) 1680000000000-0 # 最小消息ID 3) 1680000000002-0 # 最大消息ID 4) 1) consumer-1 # 各消费者的消息数 2) 2 3) consumer-2 4) 1 # 示例2查看具体待确认的消息带详情 XPENDING seckill:stream order-group - 10 1) 1) 1680000000001-0 2) consumer-1 # 谁取的 3) (integer) 15000 # 空闲时间毫秒 4) (integer) 1 # 被读取次数 # 示例3查看某个消费者的待确认消息 XPENDING seckill:stream order-group - 10 consumer-14.5 XCLAIM - 转移超时消息bash# 格式 XCLAIM stream_name groupname consumername min-idle-time id [id ...] [IDLE ms] [TIME ms] [RETRY count] [FORCE] # 示例转移空闲超过 30 秒的消息给另一个消费者 XCLAIM seckill:stream order-group consumer-2 30000 1680000000001-0 1) 1) 1680000000001-0 2) 1) userId 2) 1002 # 示例转移后重置空闲时间为 0 XCLAIM seckill:stream order-group consumer-2 30000 1680000000001-0 IDLE 04.6 XAUTOCLAIM - 自动转移Redis 6.2bash# 格式 XAUTOCLAIM stream_name groupname consumername min-idle-time start [COUNT count] [JUSTID] # 示例自动转移空闲超过 30 秒的消息给 consumer-2 XAUTOCLAIM seckill:stream order-group consumer-2 30000 0-0 COUNT 10 1) 1680000000002-0 # 下次开始的ID 2) 1) 1) 1680000000001-0 2) 1) userId 2) 1002⑤、管理命令5.1 XINFO - 查看信息bash # 查看 Stream 基本信息 XINFO STREAM seckill:stream 1) length 2) (integer) 100 # 消息总数 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) last-generated-id 8) 1680000000100-0 9) groups 10) (integer) 1 # 消费者组数量 11) first-entry 12) 1) 1680000000000-0 2) 1) userId 2) 1001 13) last-entry 14) 1) 1680000000100-0 2) 1) userId 2) 1100 # 查看消费者组信息 XINFO GROUPS seckill:stream 1) 1) name 2) order-group 3) consumers 4) (integer) 2 5) pending 6) (integer) 0 # 查看消费者信息 XINFO CONSUMERS seckill:stream order-group 1) 1) name 2) consumer-1 3) pending 4) (integer) 0 5) idle 6) (integer) 50005.2 XLEN - 获取消息长度bash XLEN seckill:stream (integer) 10005.3 XDEL - 删除消息bash# 删除单条消息 XDEL seckill:stream 1680000000000-0 (integer) 1 # 删除多条消息 XDEL seckill:stream 1680000000001-0 1680000000002-0 (integer) 2一、核心概念对比特性单消费者模式消费者组模式消费者数量1 个多个组内竞争消息分配全量给唯一消费者组内负载均衡轮询等ACK确认❌ 不支持✅ 必须 ACK消息重试❌ 不支持✅ 支持未 ACK 的消息可重新消费消息持久化✅ 支持✅ 支持消费进度记录靠消费者自己记Redis 自动记录每个消费者组的消费进度适用场景单机异步处理分布式、高可用、弹性扩展二、单消费者模式一个消息队列只有一个消费者在消费。其他消费者无法同时消费同一条消息。2.1 核心命令命令作用XADD stream * field value添加消息到 StreamXREAD COUNT 1 STREAMS stream 0读取消息0 表示从头读XREAD BLOCK 0 STREAMS stream $阻塞读取新消息$ 表示只读新消息2.2 Java 实现java Service Slf4j public class StreamSingleConsumerService { Autowired private StringRedisTemplate stringRedisTemplate; private static final String STREAM_KEY seckill:stream; /** * 生产者添加消息到 Stream */ public void produce(Long userId, Long voucherId) { MapString, String message new HashMap(); message.put(userId, userId.toString()); message.put(voucherId, voucherId.toString()); message.put(orderId, snowflakeId()); // XADD seckill:stream * userId 1001 voucherId 2001 stringRedisTemplate.opsForStream().add(STREAM_KEY, message); } /** * 消费者循环读取消息单线程 */ PostConstruct public void consume() { new Thread(() - { while (true) { try { // XREAD COUNT 1 BLOCK 0 STREAMS seckill:stream $ StreamReadOptions options StreamReadOptions.empty() .block(Duration.ofSeconds(0)) // 永久阻塞 .count(1); ListMapRecordString, Object, Object records stringRedisTemplate.opsForStream().read( options, StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()) ); for (MapRecordString, Object, Object record : records) { MapObject, Object value record.getValue(); Long userId Long.valueOf(value.get(userId).toString()); Long voucherId Long.valueOf(value.get(voucherId).toString()); // 处理订单 orderService.createOrder(userId, voucherId); // ⚠️ 单消费者模式没有 ACK处理完就结束了 // 如果这里抛异常消息就丢了 } } catch (Exception e) { log.error(消费失败, e); // 消息丢失无法重试 } } }).start(); } }2.3 单消费者模式的问题textStream 中的消息: [M1, M2, M3, M4, M5] ↓ 唯一的消费者 C1 ↓ C1 处理 M2 时崩溃了 ↓ M2 永远丢失没有 ACK没有重试三、消费者组模式多个消费者组成一个组组内竞争消费消息。Redis 自动记录每个组的消费进度支持 ACK 确认和消息重试。3.1 核心命令命令作用XGROUP CREATE stream group $创建消费者组$ 表示从最新消息开始XGROUP CREATE stream group 0创建消费者组0 表示从头开始XREADGROUP GROUP group consumer从消费者组读取消息XACK stream group id确认消息已处理XPENDING stream group查看待确认的消息XCLAIM stream group consumer min-idle-time id转移超时未确认的消息给其他消费者3.2 创建消费者组java Configuration public class StreamConfig { PostConstruct public void initConsumerGroup() { try { stringRedisTemplate.opsForStream().createGroup(STREAM_KEY, ReadOffset.latest(), seckill-group); log.info(消费者组创建成功); } catch (Exception e) { log.warn(消费者组已存在); } } }3.3 消费者组实现java Service Slf4j public class StreamGroupConsumerService { Autowired private StringRedisTemplate stringRedisTemplate; Autowired private OrderService orderService; private static final String STREAM_KEY seckill:stream; private static final String GROUP_NAME seckill-group; private static final String CONSUMER_NAME consumer- UUID.randomUUID(); /** * 生产者添加消息到 Stream */ public void produce(Long userId, Long voucherId) { MapString, String message new HashMap(); message.put(userId, userId.toString()); message.put(voucherId, voucherId.toString()); stringRedisTemplate.opsForStream().add(STREAM_KEY, message); } /** * 消费者使用消费者组读取消息 */ PostConstruct public void startConsumer() { ExecutorService executor Executors.newFixedThreadPool(3); for (int i 0; i 3; i) { executor.submit(() - { while (true) { try { // XREADGROUP GROUP seckill-group consumer-xxx COUNT 1 BLOCK 0 StreamReadOptions options StreamReadOptions.empty() .block(Duration.ofSeconds(0)) // 永久阻塞 .count(1); ListMapRecordString, Object, Object records stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, CONSUMER_NAME), options, StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()) ); for (MapRecordString, Object, Object record : records) { String messageId record.getId().toString(); MapObject, Object value record.getValue(); try { // 处理订单 Long userId Long.valueOf(value.get(userId).toString()); Long voucherId Long.valueOf(value.get(voucherId).toString()); orderService.createOrder(userId, voucherId); // 处理成功发送 ACK // XACK seckill:stream seckill-group messageId stringRedisTemplate.opsForStream().acknowledge( GROUP_NAME, STREAM_KEY, messageId ); log.info(消息处理成功: {}, messageId); } catch (Exception e) { log.error(消息处理失败: {}, messageId, e); // 未 ACK 的消息会留在 Pending 列表可被其他消费者重试 // 不用手动处理等待超时后 XCLAIM } } } catch (Exception e) { log.error(消费循环异常, e); } } }); } } }3.4消费者组的工作原理text生产者 │ ▼ XADD ┌─────────────────┐ │ Redis Stream │ │ [M1][M2][M3] │ └────────┬────────┘ │ XREADGROUP ┌─────────────┼─────────────┐ │ │ │ ▼ ▼ ▼ 消费者组 seckill-group │ ┌────┴────┬──────────┐ ▼ ▼ ▼ C1(处理M1) C2(处理M2) C3(处理M3) │ │ │ ▼ XACK ▼ XACK ▼ XACK 确认成功 确认成功 处理失败(未ACK) │ ▼ M3 留在 Pending 列表 │ ▼ 等待其他消费者认领(XCLAIM)Pending 列表和消息重试当消费者处理失败没有 ACK时消息会进入Pending 列表。查看 Pending 列表bash XPENDING seckill:stream seckill-group 1) (integer) 1 # 1 条未确认消息 2) 1680000000000-0 # 最小消息ID 3) 1680000000000-0 # 最大消息ID 4) 1) consumer-abc # 各消费者详情 2) 1查看具体 Pending 消息bash XPENDING seckill:stream seckill-group - 10 1) 1) 1680000000000-0 # 消息ID 2) consumer-abc # 哪个消费者取的 3) (integer) 5000 # 空闲时间(ms) 4) (integer) 3 # 被取次数转移超时消息XCLAIMjava Component public class PendingMessageHandler { /** * 定时处理超时未 ACK 的消息 */ Scheduled(fixedDelay 10000) // 每10秒执行一次 public void claimPendingMessages() { // 获取 pending 超过 30 秒的消息 PendingMessages pending stringRedisTemplate.opsForStream() .pending(STREAM_KEY, GROUP_NAME); for (PendingMessage message : pending) { long idleTime System.currentTimeMillis() - message.getElapsedTimeSinceLastDelivery(); if (idleTime 30000) { // 30 秒未 ACK // 转移给当前消费者处理 ClaimResult result stringRedisTemplate.opsForStream() .claim(GROUP_NAME, recovery-consumer, STREAM_KEY, List.of(message.getId()), Duration.ofSeconds(30)); // 重新处理 result.getMessages().forEach(record - { // 处理订单... stringRedisTemplate.opsForStream() .acknowledge(GROUP_NAME, STREAM_KEY, record.getId()); }); } } } }四、两种模式对比总结特性单消费者模式消费者组模式消费者数只能 1 个多个组内负载均衡高可用❌ 单点故障✅ 多消费者容错横向扩展❌ 无法扩展✅ 增加消费者即可消息确认(ACK)❌ 无✅ 必须 ACK消息重试❌ 无✅ Pending XCLAIM消费进度记录消费者自己记Redis 自动记录离线消息可读可读更可靠复杂度低中高适用场景单机开发、测试生产环境、分布式系统五、秒杀场景选择建议场景推荐模式原因开发测试单消费者模式简单快速单机部署、可接受少量丢失单消费者模式简单够用生产环境、订单不能丢消费者组模式ACK 重试 高可用高并发、多实例部署消费者组模式负载均衡 弹性扩展六、一句话总结单消费者模式 一个人吃饺子吃快了吃慢了都是他噎住了饺子就没了消费者组模式 一桌人吃饺子有人吃噎住了别人可以帮他把饺子吃完XCLAIM每个人吃完还要喊一声吃完了XACK这样不会丢饺子。秒杀场景下的实战对比现在我们可以把之前学的阻塞队列和将要学的Redis Stream在秒杀场景下进行一个直观的对比。环节阻塞队列方案 (内存)Redis Stream 方案 (专业版)1. 校验 (Lua)Lua脚本在Redis中校验库存和一人一单。完全相同。这一步是所有秒杀系统的基石不会变。2. 发送消息将订单对象add到JVM内的BlockingQueue。在Lua脚本中或Java代码里调用XADD命令将消息发送到RedisStream。3. 响应客户立即返回“排队中”用户请求结束。完全相同。用户体验无差别。4. 异步消费一个单线程的消费者从BlockingQueue中take()消息。一个独立的消费者组从Stream中XREADGROUP消息。5. 可靠性低。服务重启或消费者异常订单就彻底丢失了。高。消息在Redis中持久化消费者处理完后需发送XACK确认否则消息可被重新消费。6. 扩展性差。多实例部署时服务A的队列消息服务B的消费者无法处理。强。Stream的消费者组机制天然支持多个消费者共同处理且保证一条消息只被一个消费者处理。补充Stream 异步秒杀的实现流程一个基于 Redis Stream 的可靠异步秒杀流程如下准备事前创建秒杀专用的Stream和消费者组。抢购主线程执行Lua脚本校验资格。若成功执行XADD命令将{userId, voucherId, orderId}写入Stream。立即向前端返回“订单ID”提示“排队中”。处理后台线程后台线程循环调用XREADGROUP从消费者组读取消息。读取到新消息后调用服务执行数据库下单操作。操作成功后立即执行XACK确认消息将其从“待处理”列表中移除。如果消费者在处理下单时宕机因为没发XACK这条消息会一直留在“待处理”列表。其他消费者实例重启后可以重新读取并处理它从而保证了消息的不丢失。总结阻塞队列是单机版的异步处理简单直接但无法应对分布式和可靠性挑战。Redis Stream是企业级的消息队列方案它赋予了你的事务可靠性、跨服通信能力和弹性扩展能力。如果把构建秒杀系统比作送快递阻塞队列是员工自己用手推车在后厨和前台之间来回跑。Redis Stream则是引入了顺丰/京东的物流系统有快递单号消息ID、签收确认ACK、还能由多个快递员并行派送消费者组即使某个快递员消费者生病了快递消息也不会丢。对于准备上线的项目Stream 是比阻塞队列更健壮和专业的方案。结语如果对你有帮助请点赞关注收藏你的支持就是我最大的鼓励