【Redis从入门到精通】第67篇:Redis Stream——终于有了真正的消息队列 上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案下一篇【第68篇】HyperLogLog——用极小内存统计超大基数Redis作者antirez在还没退休的时候曾经说“Stream是Redis有史以来最复杂的命令集。”开发者“终于啊再也不用拿List冒充消息队列了。”运维“所以我们以后还要单独部署Kafka吗”架构师“看情况。Stream长得像Kafka但它骨子里还是Redis。”如果说之前的List、Pub/Sub、ZSet都只是借用Redis来做消息队列那Stream就是Redis官方亲自下场直接给你造了一个消息队列数据结构。它在设计上向Kafka致敬但保持了Redis的轻量和低延迟基因。本文将深入Stream的每一个毛孔让你从会用到懂为什么这么设计。一、Redis Stream的设计背景1.1 为什么Redis 5.0要引入Stream在Stream出现之前用Redis做消息队列是一场拼凑游戏Redis做MQ的史前时代 方式1List(BRPOP) ┌─────────────┐ │ 能干活 │ 缺点 │ 但要自己搞ACK │ - 消息确认需要自己实现 │ 要自己搞重试 │ - 不能回溯历史消息 └─────────────┘ - 多消费者没法按组消费 方式2Pub/Sub ┌─────────────┐ │ 能广播 │ 缺点 │ 但消息不会持久 │ - 掉线期间消息全丢 │ 离线全丢 │ - 没有消费者组概念 └─────────────┘ 方式3ZSet ┌─────────────┐ │ 能做延迟队列 │ 缺点 │ 但要自己轮询 │ - 高频轮询浪费资源 └─────────────┘ - 没有原生ACKantirez在设计Stream时同时参考了Kafka的分区日志和Redis的简洁哲学最终选择了一种追加日志消费者组的结构。它不是Kafka的替代品而是填补了想要消息队列但不想引入重量级中间件这个需求空白。1.2 Stream的数据模型从宏观上看一个Stream就是一个**只追加(append-only)**的日志文件Stream 一个逻辑上的消息序列 ─────────────────────────────────────────────────────────► 时间轴 [msg1] [msg2] [msg3] [msg4] [msg5] [msg6] [msg7] [msg8] ... │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ 每条消息包含 ┌─────────────────────────────┐ │ ID: 1716700000000-0 │ ← 毫秒时间戳-序号 │ 字段1: value1 │ │ 字段2: value2 │ ← 键值对KV格式 │ 字段3: value3 │ └─────────────────────────────┘ 关键特性 ✓ 消息追加到末尾不修改不删除除非手动DEL/XTRIM ✓ ID保证了全局顺序 ✓ 不同消费者以不同进度消费同一条Stream ✓ 消费者组可以追踪每个消费者的进度二、Stream ID时间与序号的精妙编码2.1 ID格式Stream的每条消息都有一个全局唯一的ID格式为Stream ID millisecondsTime-sequenceNumber 示例1716700000000-0 ┌──────────────┐ ┌┐ │ 1716700000000 │0│ └──────┬───────┘ └┤ │ │ UNIX毫秒时间戳 序列号(同毫秒内的微调) (2024-05-26 (从0开始自增) 06:00:00.000)这个设计有两个精妙之处时间可排序毫秒时间戳本身就保证了大致的时间顺序同毫秒防冲突序列号保证同一毫秒内的多条消息ID不重复# ID生成规则演示# 使用 * 让Redis自动生成ID127.0.0.1:6379XADD mystream * nameTomage251716700000000-0# 1ms后的消息127.0.0.1:6379XADD mystream * nameJerryage301716700000001-0# 时间戳自动1# 同1ms内多条消息127.0.0.1:6379XADD mystream * msga# 序列号01716700000002-0127.0.0.1:6379XADD mystream * msgb# 序列号11716700000002-1127.0.0.1:6379XADD mystream * msgc# 序列号21716700000002-2# 也可以手动指定ID不推荐除非有特殊需求127.0.0.1:6379XADD mystream1716700000003-0 msgmanual1716700000003-02.2 ID的约束规则ID规则 1. 新ID必须大于Stream中当前最大ID → 保证消息严格追加、不会插入到历史位置 → 如果客户端指定了一个小于等于最大ID的值会报错 2. 时间戳部分必须 当前最大ID的时间戳 → 防止时钟回拨造成ID错乱 3. 如果时间戳相同序列号必须 同时间戳的最大序列号 → 保证同一毫秒内的严格顺序 4. 最小ID0-0XRANGE用 - 表示 5. 最大ID理论上无穷XRANGE用 表示三、基础命令写入与读取3.1 XADD添加消息# 基本语法XADD stream_key[MAXLEN ~ count][NOMKSTREAM]IDfield value[field value...]# 添加一条消息127.0.0.1:6379XADD orders * order_id1001user_idU001amount99.91716700000000-0# 查看Stream长度127.0.0.1:6379XLEN orders(integer)1# 限制Stream长度约等于最近1000条trim效率优化127.0.0.1:6379XADD orders MAXLEN ~1000* order_id1002amount58.01716700000001-0MAXLEN ~中的~表示近似裁剪——Redis可能在Stream节点层面做性能优化实际保留的消息数可能略多于1000条。如果要求精确去掉~即可但性能会下降。3.2 XRANGE / XREVRANGE范围查询# 查询所有消息-表示最小ID表示最大ID127.0.0.1:6379XRANGE orders - COUNT51)1)1716700000000-02)1)order_id2)10013)user_id4)U0015)amount6)99.92)1)1716700000001-02)1)order_id2)10023)amount4)58.0# 查询指定时间范围127.0.0.1:6379XRANGE orders17167000000001716700001000COUNT100# 反向查询最新的在前127.0.0.1:6379XREVRANGE orders - COUNT103.3 XREAD读取消息非消费者组模式# 从最早的消息开始读取127.0.0.1:6379XREAD COUNT2STREAMS orders0-0# └─ 起始ID0-0最早# 阻塞读取新消息等5秒127.0.0.1:6379XREAD BLOCK5000STREAMS orders $# └─ 起始ID$只读最新# 同时读取多个Stream127.0.0.1:6379XREAD COUNT5BLOCK0STREAMS orders payments $XREAD的两种模式 模式1从头读指定ID如0-0或特定ID 用途消费者首次启动回溯历史消息 模式2读新消息使用$ 用途消费者只关心后续消息不关心历史 注意XREAD不是消费者组模式 它只是读取消息不会追踪消费进度。 适合一次性消费或手动管理进度的场景。四、消费者组Stream的大师级功能4.1 消费者组模型消费者组是Stream区别于List/PubSub的核心特性。它解决了多消费者协同消费的问题消费者组工作模型 Stream: orders ┌──────────────────────────────────────────────────┐ │ msg1 msg2 msg3 msg4 msg5 msg6 msg7 ... │ └──────────────────┬───────────────────────────────┘ │ 消费者组: order-processors │ ┌────────────┼────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │Consumer1│ │Consumer2│ │Consumer3│ │ 消费msg1 │ │ 消费msg2 │ │ 消费msg3 │ │ 消费msg4 │ │ 消费msg5 │ │ 消费msg6 │ │ 消费msg7 │ │ ... │ │ ... │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └── 每个消费者维护自己的 ─┘ 消费进度(PENDING列表) 关键规则 ✓ 同一条消息只被组内一个消费者消费 ✓ 不同消费者组可以独立消费同一条Stream ✓ 组内消费者可以动态增减弹性伸缩4.2 消费者组命令详解# 1. 创建消费者组 # XGROUP CREATE stream group id [MKSTREAM]127.0.0.1:6379XGROUP CREATE orders order-processors0-0 MKSTREAM# └─ 组名 └─ 从最早的ID开始消费# └─ 如果Stream不存在则创建# 注意id0-0 从头消费id$ 只消费新消息# 2. 消费者组读取消息 # XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key id127.0.0.1:6379XREADGROUP GROUP order-processors consumer-1 COUNT2STREAMS orders# └─ 组名 └─ 消费者名 └─ 表示没确认过的消息# (也接受指定ID和0)# : 只发送从未交付给任何消费者的新消息# 0 : 发送该消费者pending中但未确认的消息# 指定ID : 从特定ID开始读取# 3. 确认消息 # XACK stream group id [id ...]127.0.0.1:6379XACK orders order-processors1716700000000-0(integer)1# 确认成功# 4. 查看Pending列表 127.0.0.1:6379XPENDING orders order-processors1)(integer)5# Pending消息总数2)1716700000000-0# 最小Pending ID3)1716700000010-0# 最大Pending ID4)1)1)consumer-1# 每个消费者的Pending数量2)32)1)consumer-22)2# 详细查看Pending消息127.0.0.1:6379XPENDING orders order-processors - 101)1)1716700000000-0# ID2)consumer-1# 消费者3)(integer)60000# 空闲时间(毫秒)4)(integer)3# 被投递次数# 5. 转移消息 # XCLAIM stream group consumer min-idle-time id [id ...]127.0.0.1:6379XCLAIM orders order-processors consumer-2600001716700000000-0# └─ 组名 └─ 目标消费者 └─ 空闲阈值 └─ 消息ID# 含义将空闲超过60秒的消息从consumer-1转移给consumer-24.3 消费者组的完整工作流消息从生产到消费的完整生命周期 1. 生产者发送消息 XADD orders * order_id 1001 amount 99.9 → 消息进入Stream状态: 未分配 2. 消费者拉取消息 XREADGROUP GROUP order-processors worker-1 STREAMS orders → 消息分配给worker-1进入Pending列表 → 状态: 待确认(pending) 3. 消费者处理成功 业务逻辑处理... XACK orders order-processors 1716700000000-0 → 消息从Pending中移除 → 状态: 已确认(done) 4. 消费者处理失败或者挂了 consumer-1 获取消息后崩溃没有XACK → 消息留在consumer-1的Pending列表中 → 空闲时间持续增长 → 状态: 挂起(stuck) 5. 其他消费者认领 XPENDING 发现某消息空闲超过阈值 XCLAIM orders order-processors worker-2 60000 1716700000000-0 → 消息转移给worker-2投递次数1 → 状态: 重新分配五、Java实战Spring Data Redis操作Stream5.1 生产者代码ComponentpublicclassStreamProducer{AutowiredprivateStringRedisTemplateredisTemplate;privatestaticfinalStringSTREAM_KEYorders;// 发送消息publicStringsendMessage(MapString,Stringfields){// 构建RecordMapRecordString,String,StringrecordStreamRecords.newRecord().ofMap(fields).withStreamKey(STREAM_KEY);// XADD自动分配IDRecordIdidredisTemplate.opsForStream().add(record);// 限制Stream长度防止内存溢出redisTemplate.opsForStream().trim(STREAM_KEY,10000);// 保留最新1万条returnid.getValue();// 返回 1716700000000-0}// 批量发送publicvoidsendBatch(ListMapString,Stringmessages){messages.forEach(this::sendMessage);}}5.2 消费者代码消费者组模式ComponentpublicclassStreamConsumer{AutowiredprivateStringRedisTemplateredisTemplate;privatestaticfinalStringSTREAM_KEYorders;privatestaticfinalStringGROUP_NAMEorder-processors;privatestaticfinalStringCONSUMER_NAMEworker-UUID.randomUUID().toString().substring(0,8);PostConstructpublicvoidinit(){// 创建消费者组如果不存在try{redisTemplate.opsForStream().createGroup(STREAM_KEY,GROUP_NAME);}catch(RedisSystemExceptione){// 组已存在忽略if(!e.getMessage().contains(BUSYGROUP)){throwe;}}// 启动消费循环startConsuming();}publicvoidstartConsuming(){newThread(()-{while(true){try{// 先处理Pending消息之前没ACK的processPending();// 再拉取新消息ListMapRecordString,Object,ObjectmessagesredisTemplate.opsForStream().read(Consumer.from(GROUP_NAME,CONSUMER_NAME),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(5)),StreamOffset.create(STREAM_KEY,ReadOffset.lastConsumed()));for(MapRecordString,Object,Objectmessage:messages){try{// 处理业务逻辑processOrder(message.getValue());// 确认消息redisTemplate.opsForStream().acknowledge(STREAM_KEY,GROUP_NAME,message.getId());}catch(Exceptione){log.error(消息处理失败: {},message.getId(),e);// 不ACK等待XCLAIM转移或重试}}}catch(Exceptione){log.error(消费循环异常,e);try{Thread.sleep(1000);}catch(InterruptedExceptionie){}}}},stream-consumer).start();}// 处理Pending消息兜底机制privatevoidprocessPending(){PendingMessagespendingMessagesredisTemplate.opsForStream().pending(STREAM_KEY,GROUP_NAME,PendingMessagesOptions.empty().range(Range.unbounded()).count(10));for(PendingMessagemsg:pendingMessages){// 空闲超过60秒的消息重新处理if(msg.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofSeconds(60))0){ListMapRecordString,Object,ObjectclaimedredisTemplate.opsForStream().claim(STREAM_KEY,GROUP_NAME,CONSUMER_NAME,Duration.ofSeconds(60),RecordId.of(msg.getIdAsString()));for(MapRecordString,Object,Objectrecord:claimed){try{processOrder(record.getValue());redisTemplate.opsForStream().acknowledge(STREAM_KEY,GROUP_NAME,record.getId());}catch(Exceptione){log.error(Pending消息重试失败: {},record.getId(),e);}}}}}}⚠️ 注意Spring Data Redis 3.x与2.x的Stream API变化较大。上面用的是3.x的API。如果你还在用2.x需要参考对应版本的文档核心逻辑是一样的只是类名和方法签名略有不同。六、Stream vs Kafka相似与不同6.1 架构对比Redis Stream ┌────────────────────────────────────────────┐ │ Redis 单节点(或Cluster) │ │ │ │ Stream: orders 消费者组: proc-1 │ │ ┌─────────────────┐ ┌──────┐ ┌──────┐ │ │ │ msg1 msg2 msg3 │ │Consumer1│Consumer2│ │ │ └─────────────────┘ └─────────┘ │ │ 外部存储 │ └────────────────────────────────────────────┘ 优势部署简单低延迟 劣势单Stream无分区受单机内存限制 Kafka ┌────────────────────────────────────────────┐ │ Kafka Broker集群 │ │ │ │ Topic: orders (3分区) │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Partition 0 │ │ Partition 1 │ ... │ │ │ 磁盘持久化 │ │ 磁盘持久化 │ │ │ └─────────────┘ └─────────────┘ │ │ 磁盘 TB级存储 │ │ 消费者组 跨分区消费 │ └────────────────────────────────────────────┘ 优势海量堆积、分区并行、持久化到磁盘 劣势运维复杂、延迟略高6.2 关键差异维度Redis StreamKafka分区/分片无原生分区分区是核心概念存储内存RDB/AOF备份磁盘顺序写入消息删除MAXLEN/XTRIM/手动DEL按时间或大小自动清理单Stream吞吐10万/s百万/s延迟1ms2-10ms消费者进度保存在Redis中保存在Kafka内部topic消息回溯支持只要没被trim支持可配置保留时间部署成本低你可能已有Redis高需独立集群6.3 XTRIM控制Stream长度# 精确裁剪保留最近1000条127.0.0.1:6379XTRIM orders MAXLEN1000# 近似裁剪性能更高但可能多保留几十条127.0.0.1:6379XTRIM orders MAXLEN ~1000# 按最小ID裁剪删除小于指定ID的消息127.0.0.1:6379XTRIM orders MINID1716700001000-0# XADD时同时限制长度一步到位127.0.0.1:6379XADD orders MAXLEN ~1000* field value⚠️ 注意XTRIM会删除消息如果有消费者还没消费到这些消息消息就永久丢失了。在设置MAXLEN时要确保所有消费者组都能在消息被trim之前消费完毕。建议给Stream预留足够的长度余量比如你日均消费1000条设置MAXLEN 5000比1000更安全。七、总结Redis Stream让Redis真正成为了一款可以当消息队列用的数据结构服务器。它的核心价值在于零额外成本你很可能已经在用Redis了Stream不需要新的部署消费者组解决了多消费者协同消费的难题消息ACK消费确认机制保障了消息可靠投递低延迟内存操作延迟可以做到亚毫秒级但它不是Kafka的替代品。面对TB级堆积、分区并行消费、严格顺序保证等场景专业的消息中间件仍然是最佳选择。记住一个判断标准如果你的团队已经在维护Kafka/RabbitMQ就别折腾Stream了如果你没有MQ且消息量不大百万级/天以下Stream是性价比最高的选择。下一篇文章我们将聊聊HyperLogLog——一个用12KB内存就能统计1亿用户的神奇数据结构。上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案下一篇【第68篇】HyperLogLog——用极小内存统计超大基数