RocketMQ源码深度解析(三)消息持久化机制 一、RocketMQ 消息持久化整体设计1.1 持久化核心价值RocketMQ 是磁盘存储型消息中间件所有消息默认落地磁盘持久化彻底解决服务重启、机器宕机导致的消息丢失问题实现消息可追溯、可重试、可恢复是分布式消息可靠性的核心基石。1.2 存储三大核心文件结构RocketMQ 所有消息存储由三类文件组成分工明确、层层协作摒弃了 Kafka 分区日志存储模式采用日志索引分离的极致设计CommitLog消息日志文件核心数据文件存储全部原始消息实体所有 Topic 的消息统一顺序写入包含完整消息体、属性、时间、偏移量等原始数据。由多个文件组成,每个文件1GConsumeQueue消费队列文件Topic 级别的索引文件轻量化存储不存消息内容仅存储「消息偏移量消息长度Tag哈希」专供消费者拉取消息使用。记录当前MessageQueue被哪些消费者组消费到了那一条CommitLog。IndexFile索引文件消息检索索引存储消息 Key、时间戳、CommitLog 偏移量支持业务 Key 精准查询、消息轨迹追溯。1.3 消息存储整体结构图简单来说Producer发过来的所有消息不管是属于那个TopicBroker都统一存在CommitLog文件当中然后分别构建ConsumeQueue文件和IndexFile两个索引文件用来辅助消费者进行消息检索。这种设计最直接的好处是可以较少查找目标文件的时间让消息以最快的速度落盘。对比Kafka存文件时需要寻找消息所属的Partition文件再完成写入。当Topic比较多时这样的Partition寻址就会浪费非常多的时间。所以Kafka不太适合多Topic的场景。而RocketMQ的这种快速落盘的方式在多Topic的场景下优势就比较明显了。然后在文件形式上CommitLog文件的大小是固定的。文件名就是当前CommitLog文件当中存储的第一条消息的Offset。ConsumeQueue文件主要是加速消费者进行消息索引。每个文件夹对应RocketMQ中的一个MessageQueue文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样消费者通过ConsumeQueue文件就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件中的消费进度会保存在config/consumerOffset.json文件当中。IndexFile文件主要是辅助消费者进行消息索引。消费者进行消息消费时通ConsumeQueue文件就足够完成消息检索了但是如果消费者指定时间戳进行消费或者要按照Meessageld或者MessageKey来检索文件比如RocketMQ管理控制台的消息轨迹功能ConsumeQueue文件就不够用了。IndexFile文件就是用来辅助这类消息检索的。他的文件名比较特殊不是以消息偏移量命名而是用的时间命名。但是其实他也是一个固定大小的文件。1.4 持久化核心优势 常见核心问题核心优势顺序写、随机读CommitLog 全程顺序追加写入磁盘IO效率极高规避机械磁盘随机写性能瓶颈。读写分离写入只操作 CommitLog消费、查询操作索引文件互不阻塞。轻量化索引ConsumeQueue、IndexFile 体积小、加载快极大提升消费和检索效率。统一存储多 Topic 消息统一存储无文件碎片化磁盘利用率高。生产常见核心问题为什么 RocketMQ 速度快核心顺序写磁盘 内存缓存 读写分离消息会不会丢失刷盘机制主从复制双重保障为什么支持海量消息堆积磁盘持久化过期文件自动删除消费速度为什么不受 Topic 数量影响统一 CommitLog 存储无分区文件瓶颈二、CommitLog 写入核心原理核心重点2.1 CommitLog 写入方式CommitLog 采用全局顺序追加写入模式所有 Topic、所有队列的消息全部按到达时间顺序追加写入文件末尾是 RocketMQ 高吞吐的核心根源。文件固定大小1GB写满自动新建下一个文件文件命名为起始偏移量如 00000000000000000000实现无缝滚动写入。2.2 写入加锁机制并发安全核心多生产者并发发送消息时为保证消息顺序写入、偏移量连续不混乱CommitLog 写入采用分段锁 全局写锁机制核心锁对象putMessageLockReentrantLock 可重入锁加锁时机消息落地 CommitLog 前统一加锁保证同一时刻只有一条消息执行写入锁粒度全局文件锁保证磁盘文件偏移量严格递增、消息有序存储解锁时机消息写入内存缓冲区完成后立即解锁不阻塞刷盘、后续流程核心作用解决并发写入导致的文件覆盖、偏移量错乱、消息丢失问题保证 CommitLog 日志绝对有序。2.3 消息写入完整流程Broker 接收 Producer 消息校验消息合法性、权限、大小竞争获取putMessageLock全局写锁计算当前文件写入偏移量判定文件是否写满满则新建文件消息封装为固定格式字节数组追加写入 PageCache 内存缓冲区更新全局最大偏移量释放写锁触发刷盘机制同步/异步落地磁盘返回消息写入成功结果记录消息偏移量2.4 核心写入源码片段// CommitLog 核心写入加锁逻辑 public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 1. 竞争全局写锁 putMessageLock.lock(); try { // 2. 获取当前写入文件判断是否需要滚动新建 MappedFile mappedFile this.mappedFileQueue.getLastMappedFile(); if (null mappedFile || mappedFile.isFull()) { mappedFile this.mappedFileQueue.getLastMappedFile(); } // 3. 写入PageCache缓冲区 long beginOffset mappedFile.getFileFromOffset() mappedFile.getWritePosition(); int wroteBytes mappedFile.appendMessage(msg, this.appendMessageCallback); // 4. 更新偏移量 if (wroteBytes 0) { this.maxOffset.addAndGet(wroteBytes); } } finally { // 5. 释放锁 putMessageLock.unlock(); } // 6. 执行刷盘逻辑 return flushMessage(msg); }putMessageLock可以根据配置信息选择是SpingLock自旋锁还是ReentrantLock可重入锁。自旋锁就是一直尝试CAS直到拿到锁。ReentrantLock做一次CAS拿不到就休眠直到前面线程unlock的时候唤醒继续竞争锁非公平。两者的区别在于如果写入的消息非常多竞争非常激烈适合用ReentrantLock减少CPU空转。竞争没有那么激烈则适合用自旋锁得到锁的速度更快。三、同步刷盘 异步刷盘机制3.1 核心原理区别操作系统会将磁盘写入操作先缓存到PageCache 内存页缓存写入缓存不代表落地磁盘机器宕机仍会丢失消息。刷盘机制就是将内存缓存数据强制落地磁盘的机制RocketMQ 提供两种刷盘模式。3.2 异步刷盘默认模式高性能原理消息写入 PageCache 内存缓冲区后立即返回写入成功不等待磁盘落地。后台专属刷盘线程定时批量将内存数据刷入磁盘。特点性能极高、吞吐量大适配绝大多数业务场景极端机器宕机、断电场景会丢失少量缓存消息默认刷盘间隔100ms 批量刷盘3.3 同步刷盘高可靠模式原理消息写入 PageCache 后阻塞等待刷盘完成磁盘落地成功后才返回生产者写入成功响应。特点零消息丢失数据绝对可靠磁盘IO阻塞性能吞吐量大幅下降适配金融、支付、订单等核心零丢失业务3.4 刷盘机制流程图3.5 刷盘核心源码Override public CompletableFuturePutMessageStatus handleDiskFlush(AppendMessageResult result, MessageExt messageExt) { // Synchronization flush 同步刷盘 if (FlushDiskType.SYNC_FLUSH CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) {//构建request的时候从配置文件中读取了刷盘超时时间默认5秒。 GroupCommitRequest request new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); flushDiskWatcher.add(request);//这里只是监控刷盘是否超时。 service.putRequest(request);//实际进行刷盘刷盘操作先排队再执⾏。 return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // Asynchronous flush else { if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {//默认false flushCommitLogService.wakeup(); } else { commitRealTimeService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }同步刷盘中,会采用读写队列双缓存的设计,有效提高高并发场景下数据一致性问题四、CommitLog 主从复制机制4.1 机制作用单节点 Broker 存在单点故障RocketMQ 通过主从同步复制实现消息副本备份主节点 Master 负责读写从节点 Slave 同步数据、提供读兜底实现高可用、故障自动切换。4.2 完整同步流程连接建立Slave 启动后主动连接 Master 节点上报自身已同步的最大偏移量数据拉取Master 根据 Slave 偏移量推送未同步的 CommitLog 数据增量同步仅同步增量数据不重复同步历史数据提升同步效率落地存储Slave 接收数据后写入本地 CommitLog 并执行刷盘心跳保活主从定时心跳维持同步连接断连后自动重连续传4.3 主从复制核心关注点异步复制Master 写入成功即返回不等待 Slave 同步完成性能高短暂宕机可能丢失少量数据同步复制Master 等待 Slave 同步完成后再返回数据零丢失性能较低偏移量续传支持断点续传重启后从上次偏移量继续同步无需全量同步读写分离默认消费优先从 Slave 读取减轻 Master 压力五、ConsumeQueue 和 IndexFile 分发机制5.1 分发核心逻辑CommitLog 存储全量消息但消费者无法直接遍历大文件消费。RocketMQ 启动后台异步分发线程reputMessageService将 CommitLog 原始消息拆解生成两类索引文件。5.2 ConsumeQueue 消费队列分发流程ConsumeQueue 是Topic队列维度的轻量化索引每条索引固定 20 字节结构CommitLog偏移量(8byte) 消息长度(4byte) Tag哈希值(8byte)。分发线程扫描最新写入的 CommitLog 数据根据消息的 Topic、QueueId 归属对应队列提取消息偏移量、长度、Tag哈希生成索引条目追加写入对应 ConsumeQueue 文件消费者消费时先读取 ConsumeQueue 索引再精准定位 CommitLog 原始消息5.3 IndexFile 索引文件分发流程IndexFile 用于消息精准检索支持根据业务 Key、时间戳查询消息。解析消息自定义 Keys 属性将 Key、消息时间戳、CommitLog 偏移量封装为索引写入 IndexFile 哈希索引结构支持后台快速检索、消息轨迹排查、异常回溯5.4 分发核心特点异步分发不阻塞消息写入主流程不影响吞吐解耦读写写入只操作 CommitLog消费查询只操作索引文件极致轻量化索引文件体积极小百万级消息索引仅占用少量磁盘六、过期文件自动删除机制6.1 机制作用RocketMQ 消息默认保留72小时过期消息自动清理避免磁盘无限膨胀同时保留可追溯窗口期平衡磁盘占用与消息可回溯性。删除的时候并未关注消息是否删除,会导致业务数据受损6.2 删除核心规则默认保留时长72h可自定义配置删除对象过期的 CommitLog、ConsumeQueue、IndexFile 完整文件删除前提文件内所有消息均过期不会删除正在写入的活跃文件磁盘保护磁盘使用率过高时强制提前删除过期文件防止磁盘打满6.3 过期文件删除流程后台定时清理线程默认每 10s 执行一次遍历所有存储文件获取文件最后更新时间对比系统当前时间判断文件是否超过保留时长校验文件是否为活跃写入文件非活跃且过期则标记待删除执行文件删除同步释放磁盘空间更新文件队列索引保证新消息正常写入6.4 过期文件源码解析入口DefaultMessageStore.addScheduleTask - DefaultMessageStore.this.cleanFilesPeriodically() 和DefaultMessageStore.this.cleanQueueFilesPeriodically()在这个方法中会启动两个线程cleanCommitLogService用来删除过期的CommitLog文件cleanConsumeQueueService用来删除过期的ConsumeQueue和IndexFile文件。在删除CommitLog文件时Broker会启动后台线程每60秒检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说默认情况下RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。触发过期文件删除时有两个检查的纬度一个是是否到了触发删除的时间也就是broker.conf里配置的deleteWhen属性。另外还会检查磁盘利用率达到阈值也会触发过期文件删除。这个阈值默认是72%可以在broker.conf文件当中定制。但是最大值为95最小值为10。然后在删除ConsumeQueue和IndexFile文件时会去检查CommitLog当前的最小Offset然后在删除时进行对齐。需要注意的是RocketMQ在删除过期CommitLog文件时并不检查消息是否被消费过。所以如果有消息长期没有被消费是有可能直接被删除掉造成消息丢失的。6.4 过期文件删除流程图七、三大文件索引结构详解7.1 CommitLog 结构不定长文件单文件 1GB顺序追加存储完整原始消息数据包含消息头、消息体、属性、时间戳、队列ID、偏移量、CRC校验码等全量信息是消息的唯一数据源。由于该文件的大小固定,所以存储时单元长度是不一致的,因此每个数据前会存储数据的字节长度,计算规则如下:public static int calMsgLength(MessageVersion messageVersion, int sysFlag, int bodyLength, int topicLength, int propertiesLength) { int bornhostLength (sysFlag MessageSysFlag.BORNHOST_V6_FLAG) 0 ? 8 : 20; int storehostAddressLength (sysFlag MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) 0 ? 8 : 20; return 4 //TOTALSIZE 4 //MAGICCODE 4 //BODYCRC 4 //QUEUEID 4 //FLAG 8 //QUEUEOFFSET 8 //PHYSICALOFFSET 4 //SYSFLAG 8 //BORNTIMESTAMP bornhostLength //BORNHOST 8 //STORETIMESTAMP storehostAddressLength //STOREHOSTADDRESS 4 //RECONSUMETIMES 8 //Prepared Transaction Offset 4 (Math.max(bodyLength, 0)) //BODY messageVersion.getTopicLengthSize() topicLength //TOPIC 2 (Math.max(propertiesLength, 0)); //propertiesLength }7.2 ConsumeQueue 结构定长 20 字节单条索引纯索引无业务数据结构固定8ByteCommitLog 物理偏移量4Byte消息序列化长度8Byte消息 Tag 哈希值优势定长结构可通过偏移量快速定位索引消费效率极高。7.3 IndexFile 索引结构采用哈希索引链表结构解决消息 Key 重复问题索引头存储文件基础信息、索引数量哈希槽映射消息 Key 哈希值索引链表相同哈希值的消息形成链表支持冲突解决存储内容消息Key、时间戳、CommitLog偏移量、下一个索引位置作用支撑业务维度精准消息检索是消息排查、轨迹追溯的核心。