长会话状态治理下数据更新机制、并发保护与可复用设计原则系列导航本文是长会话状态治理系列的下篇聚焦于业务成功后怎么安全写下检查点以及可复用的设计原则。上篇讲解了问题分析、存储分层设计和恢复机制的完整实现见 长会话状态治理上。一、回顾与本篇定位在上篇中我们分析了长会话运行态为什么天然脆弱介绍了 Redis Hot Snapshot Cold Snapshot Turn Archive 的三层存储分层设计并深入剖析了以ensureRuntime(...)为核心的恢复机制——当 Redis 运行态缺失时如何从检查点材料中安全地恢复会话状态。但恢复机制只是闭环的一半。用一句话来概括两条主线的关系更新机制负责生产恢复材料恢复机制负责消费恢复材料。没有更新机制持续产出高质量的检查点再好的恢复入口也巧妇难为无米之炊。本篇将深入讲解数据更新机制的完整实现——从触发时机、防抖聚合、CAS 重试循环到 Patch 差量更新、单调性校验、幂等补偿再到轮次归档与软回放幂等的详细设计最后提炼出适用于任何长会话系统的七条可复用原则。二、数据更新机制业务成功后怎么安全写下检查点2.1 核心目标数据更新机制的核心目标是当业务成功推进后把最新状态安全写成可恢复检查点供后续恢复链路使用。它不是简单的save 一下对象因为这里的状态不是普通 CRUD。它面对的是高频更新、并发更新、热冷数据变化频率不同、请求会重试、结果可能半成功这些现实挑战。2.2 触发时机更新机制的入口分布在关键业务节点之后入口方法触发时机快照级别是否强制刷盘refreshAfterQuestionExtraction(sessionId)出题完成QUESTION_READY否可防抖refreshAfterDemeanorEvaluated(sessionId)神态评分完成ACTIVE否refreshAfterAnswerCommitted(sessionId, requestId, turnLog)答题提交成功ACTIVE是强制刷盘refreshAfterFinalize(sessionId)面试结束FINALIZED是强制刷盘每个入口都会构造一个HotRefreshRequest携带触发类型、requestId 和已提交的 turnLog// 答题提交成功时publicstaticHotRefreshRequestanswerCommitted(StringsessionId,StringrequestId,InterviewTurnLogturn){returnHotRefreshRequest.builder().sessionId(sessionId).trigger(ANSWER_COMMITTED).snapshotLevel(ACTIVE).requestId(requestId).committedTurn(turn).persistTurnArchive(true)// 需要归档.forceFlush(true)// 强制立即刷盘.build();}// 出题完成时publicstaticHotRefreshRequestquestionReady(StringsessionId){returnHotRefreshRequest.builder().sessionId(sessionId).trigger(QUESTION_READY).snapshotLevel(QUESTION_READY).persistTurnArchive(false)// 不需要归档.forceFlush(false)// 可以防抖.build();}注意forceFlush的区别答题提交和面试结束是关键检查点必须立即刷盘出题完成等中间态可以延迟合并减少写压力。2.3 HotRefreshCoordinator防抖与聚合每次业务成功后不是直接去写 Mongo而是先把刷新意图提交给HotRefreshCoordinator。这个 Coordinator 做了三件事1. 按 session 聚合同一 session 在短时间内多次刷新意图会被合并到一个 Bucket 中。2. 防抖延迟普通中间态刷新会延迟一个 debounce window默认 150ms等待后续意图合并。最大聚合窗口默认为 500ms确保不会无限延迟。3. 关键检查点立即刷盘forceFlush标记的刷新会跳过防抖等待当前 flush 完成后立即执行。submit(request) │ ├── Coordinator 未启用 → 直接 flush同步执行 │ ├── forceFlush true答题提交、面试结束等 │ └── 合并到 Bucket │ └── 如果当前有 flush 正在执行 │ │ └── 等待最多 600ms直到当前 flush 完成 │ └── 立即执行 flushBucket() │ └── forceFlush false出题完成等中间态 └── 合并到 Bucket └── 如果当前没有 flush 正在执行 └── 延迟 debounceWindow150ms后调度执行Bucket 内部通过 synchronized 保证线程安全merge 逻辑会把多次刷新的信息合并保留最新的 snapshotLevel、最新的 requestId 和 turnLog合并 persistTurnArchive 标记。Coordinator 内部用ConcurrentHashMapsessionId, Bucket管理所有活跃 session 的刷新意图。当某个 session 的所有刷新都已完成且没有新的意图时对应的 Bucket 会被清理避免内存泄漏。2.4 refreshSnapshot刷新核心流程refreshSnapshot(...)是数据更新机制的真正核心。它的完整执行流程如下refreshSnapshot(sessionId, snapshotLevel, requestId, committedTurn, persistTurnArchive) │ ├── Step 1: 加载当前状态 │ ├── 从 DB 加载 InterviewSession │ ├── 从 DB 加载 InterviewQuestion │ ├── 从 Mongo 加载当前 Hot Snapshot │ └── 从 Mongo 加载当前 Cold Snapshot │ ├── Step 2: 进入 CAS 重试循环最多 3 次 │ │ │ ├── Step 2a: 从 Redis DB 组装最新状态 │ │ ├── resolveQuestions(): 优先 Redis → 降级 DB │ │ ├── resolveSuggestions(): 优先 Redis → 降级 DB │ │ ├── resolveResumeContext(): 优先 Redis → 降级 DB │ │ ├── resolveTurns(): Redis → Archive → Snapshot → committedTurn │ │ ├── resolveFlow(): Redis → 终态推导 → turns 推导 → 初始 Flow │ │ └── resolveScoreAggregate(): 从 turns 重新计算 sum/count │ │ │ ├── Step 2b: 归档当前轮次如果需要 │ │ └── archiveTurn(): │ │ ├── 先检查同一 requestId 是否已归档幂等 │ │ ├── 如果已归档返回已有 seq │ │ └── 否则追加一条 Turn Archiveseq 单调递增 │ │ │ ├── Step 2c: 构建 HotPatch增量更新载体 │ │ └── buildHotPatch(): │ │ ├── snapshotVersion: current 1 │ │ ├── flow / scoreAggregate / recentTurns (上限 20) │ │ ├── archiveWatermark / lastTurnSeq │ │ ├── lastMutationId / lastAppliedRequestId │ │ └── lastCommittedQuestionNumber / lastCommittedTurnDigest │ │ │ ├── Step 2d: 三重保护检查 │ │ ├── shouldSkipHotPatch? → 所有字段一致跳过写入 │ │ ├── seedHotSnapshot? → 快照不存在初始化种子 │ │ ├── isMutationAlreadyApplied? → 同一 requestId 已处理跳过 │ │ └── validateHotPatchMonotonicity → 单调性校验 │ │ │ ├── Step 2e: CAS 写入 │ │ └── compareAndSetPatch(sessionId, expectedVersion, patch) │ │ ├── 成功 → 跳出循环 │ │ └── 失败 → 检查最新版本是否已包含同一 mutationId │ │ ├── 是 → 幂等命中安全跳过 │ │ └── 否 → backoff 后重试 │ │ │ └── Step 2f: CAS 重试退避 │ └── backoff baseDelay × (attempt 1) random(10~30ms) │ ├── Step 3: 条件触发冷快照更新 │ └── applyColdSnapshotIfNecessary(): │ ├── 如果 snapshotLevel ACTIVE 且冷快照已存在 → 跳过 │ └── 否则写入冷快照questions/suggestions/resumeContext 等 │ └── Step 4: 返回结果 ├── 成功 → true └── CAS 重试耗尽 → false打 warn 日志这里有几个值得注意的设计决策为什么每轮都重新组装状态因为在 CAS 重试循环中当前线程可能在等待期间被其他线程抢先更新了快照。每次循环都重新从 Redis 和 DB 读取最新数据确保构建出来的 Patch 是基于最新版本的状态。为什么 recentTurns 限制为 20 轮热快照的recentTurns只需要保留最近 N 轮用于快速恢复上下文。全量历史由 Turn Archive 承担。这避免了热快照文档随面试推进无限膨胀。2.5 Patch 而非整包覆盖每次更新不是把整个文档重写而是通过HotPatch/ColdPatch做字段级增量更新。以HotSnapshotRepositoryImpl为例它构建的 MongoUpdate对象只 set 有值的字段privateUpdatebuildUpdate(StringsessionId,HotPatchpatch,Datenow,booleanwithOnInsert){UpdateupdatenewUpdate();if(withOnInsert){update.setOnInsert(sessionId,sessionId);// upsert 时设置update.setOnInsert(createTime,now);}// 逐字段按需 setif(patch.getFlow()!null)update.set(flow,patch.getFlow());if(patch.getScoreAggregate()!null)update.set(scoreAggregate,patch.getScoreAggregate());if(patch.getRecentTurns()!null)update.set(recentTurns,patch.getRecentTurns());if(patch.getArchiveWatermark()!null)update.set(archiveWatermark,patch.getArchiveWatermark());if(patch.getLastMutationId()!null)update.set(lastMutationId,patch.getLastMutationId());// ... 其余字段同理update.set(updateTime,now);returnupdate;}冷快照的 Patch 逻辑类似但更新的字段是低频材料questions / suggestions / resumeContext / demeanorScore 等并且采用 upsert 语义——如果文档不存在则创建存在则按字段更新。Patch 的好处避免并发场景下后写覆盖前写——每次 Patch 只更新变化的字段不触碰其他字段减少 Mongo 写入量——不需要每次都把整个文档序列化并传输天然支持热冷分层——热快照和冷快照独立 Patch互不干扰2.6 CAS 并发保护热快照的更新采用Compare-And-Set语义这是保证并发安全的核心publicbooleancompareAndSetPatch(StringsessionId,LongexpectedVersion,HotPatchpatch){QueryqueryQuery.query(Criteria.where(sessionId).is(sessionId).and(snapshotVersion).is(expectedVersion)// CAS 条件);UpdateupdatebuildUpdate(sessionId,patch,now,false);UpdateResultresultmongoTemplate.updateFirst(query,update,HotSnapshot.class);returnresult.getModifiedCount()0;}只有当前版本号snapshotVersion与预期一致时才会写入成功。如果并发请求导致版本号已经前进CAS 会失败随后触发退避重试参数值说明最大重试次数3避免无限重试基础退避20ms × (attempt 1)线性递增随机抖动1030ms避免多个线程同时重试惊群效应CAS 保证了即使多个线程同时在刷新热快照也不会出现旧版本覆盖新版本的情况。为什么冷快照不需要 CAS冷快照的更新频率低出题完成、神态评分完成等关键节点并发冲突概率小。而且冷快照更新采用的是 upsert 字段级 Patch即使并发写入也不会互相覆盖只是可能重复设置相同值。这是一种有意识的简化——用最小的复杂度覆盖最大的风险。2.7 单调性保护在 CAS 之前还有一层validateHotPatchMonotonicity校验确保关键业务指标不会回退privatevoidvalidateHotPatchMonotonicity(HotSnapshotcurrent,HotPatchpatch){// 1. 轮次序号不能回退if(patch.getLastTurnSeq()current.getLastTurnSeq())thrownewIllegalStateException(hot snapshot lastTurnSeq regressed);// 2. 归档水位不能回退if(patch.getArchiveWatermark()current.getArchiveWatermark())thrownewIllegalStateException(hot snapshot archiveWatermark regressed);// 3. 计分次数不能减少if(patch.getScoreAggregate().getScoreCount()current.getScoreAggregate().getScoreCount())thrownewIllegalStateException(hot snapshot scoreCount regressed);// 4. 流程进度不能倒退非终态下if(!current.getFlow().isCompleted()patch.getFlow().getCurrentIndex()current.getFlow().getCurrentIndex())thrownewIllegalStateException(hot snapshot flow index regressed);}如果检测到回退直接抛出IllegalStateException阻止脏数据写入。这层校验相当于在 CAS 的版本号一致性之上又加了一层业务语义一致性的保护。为什么需要两层保护CAS 只能保证我的更新是基于最新版本做的但不能保证我构建出来的 Patch 内容本身是正确的。比如某个旧请求在重试时基于过期的 Redis 数据构建了一个 flow虽然 CAS 会拦住它版本号不对但如果恰好版本号一致极端场景单调性校验就是最后一道防线。2.8 幂等补偿热快照中记录了lastMutationId通常是 requestId。在刷新前和 CAS 失败后都会检查privatebooleanisMutationAlreadyApplied(HotSnapshothotSnapshot,StringmutationId){returnStrUtil.equals(hotSnapshot.getLastMutationId(),mutationId);}第一次检查CAS 之前如果当前快照的lastMutationId已经等于本次 requestId说明之前的请求已经成功写入直接跳过。第二次检查CAS 失败之后如果 CAS 失败重新加载最新版本的快照再做一次lastMutationId检查。如果最新版本的 mutationId 与当前请求一致说明另一个并发线程已经成功写入了同一个变更当前请求可以安全跳过。这保证了即使前端超时重试同一个 requestId 不会导致状态被重复推进。2.9 跳过快照更新当新旧快照的所有关键字段完全一致时系统会跳过无意义的写入privatebooleanshouldSkipHotPatch(HotSnapshotcurrent,HotPatchpatch){returnObjects.equals(current.getUserId(),patch.getUserId())StrUtil.equals(current.getSessionStatus(),patch.getSessionStatus())Objects.equals(current.getFlow(),patch.getFlow())Objects.equals(current.getScoreAggregate(),patch.getScoreAggregate())Objects.equals(current.getRecentTurns(),patch.getRecentTurns())// ... 全部字段对比;}这避免了在防抖窗口合并后、或重试场景下执行完全重复的 Mongo 写入。虽然 Mongo 的 upsert 本身是幂等的但跳过无意义的写入可以减少 IO 开销和版本号递增。三、轮次归档与软回放幂等3.1 Turn Archive 的设计每次答题成功提交后除了更新热快照的recentTurns还会向Turn Archive追加一条归档记录。归档实体结构非常精简Document(collectioninterview_session_turn_archive)publicclassInterviewSessionTurnArchive{IdprivateStringid;IndexedprivateStringsessionId;// 会话标识IndexedprivateStringrequestId;// 请求标识幂等键IndexedprivateLongseq;// 单调递增序号privateLongsnapshotVersion;// 关联的快照版本privateInterviewTurnLogturnPayload;// 完整轮次数据CreatedDateprivateDatecreatedAt;}Turn Archive 是全量追加、永不修改的。它承担两个职责全量历史回放恢复时可以按seq升序加载所有轮次得到完整的面试对话历史软回放幂等通过requestId判断某个请求是否已经成功处理过归档写入本身也做了幂等保护privateLongarchiveTurn(StringsessionId,StringrequestId,InterviewTurnLogturn,Longversion){// 先检查同一 requestId 是否已归档if(StrUtil.isNotBlank(requestId)){OptionalTurnArchiveexistingturnArchiveRepo.findBySessionIdAndRequestId(sessionId,requestId);if(existing.isPresent()){returnexisting.get().getSeq();// 已归档直接返回已有 seq}}// 否则追加新归档seq 从最大 seq 1 开始longnextSeqturnArchiveRepo.findFirstBySessionIdOrderBySeqDesc(sessionId).map(a-a.getSeq()1L).orElse(1L);// ... 保存returnnextSeq;}3.2 软回放幂等的三重匹配当重复请求进来时findReplayResponse(...)会通过三重机制识别重复findReplayTurn(snapshot, requestId, questionNumber, answerContent) │ ├── 第一重按 requestId 匹配 recentTurns │ └── 从后往前遍历 recentTurns查找 requestId 一致的 turn │ └── 命中 → 直接回放该 turn 的结果 │ ├── 第二重按 turnDigest 匹配 recentTurns │ └── SHA256(题号 | 答案前1000字符) │ └── 从后往前遍历 recentTurns查找 digest 一致的 turn │ └── 命中 → 直接回放 │ └── 第三重按 lastCommittedTurnDigest 匹配 Archive └── 如果热快照的 lastCommittedTurnDigest 与当前请求一致 └── 加载 Turn Archive 的最后一条记录 └── 命中 → 回放该归档的结果三重匹配的设计考虑了不同的降级场景第一重依赖 requestId最常见也最可靠第二重依赖内容摘要在 requestId 丢失时兜底第三重依赖热快照的摘要字段 归档在 recentTurns 被截断时兜底其中turnDigest的计算方式值得注意——它取题号和答案内容的前 1000 个字符做 SHA256privateStringbuildTurnDigest(StringquestionNumber,StringanswerContent){returnDigestUtil.sha256Hex(normalizeQuestionNumber(questionNumber)|truncateAnswer(answerContent));}privateStringtruncateAnswer(StringanswerContent){returnanswerContentnull?:answerContent.length()1000?answerContent:answerContent.substring(0,1000);}截取 1000 字符是为了平衡匹配精度和性能。在正常业务场景下同一个题号 同一个用户的答案内容前 1000 字符已经足够唯一。3.3 回放响应的构建一旦匹配到已处理的 turn系统会直接从 turn 的数据构建完整响应而不是重新执行答题链路privateInterviewAnswerRespDTObuildReplayResponse(InterviewTurnLogturn){InterviewAnswerRespDTOresponseInterviewAnswerRespDTO.init();response.withCurrentQuestion(turn.getQuestionNumber(),turn.getQuestionContent());response.withEvaluation(turn.getScore(),turn.getFeedback(),turn.getTotalScore());if(Boolean.TRUE.equals(turn.getFinished())){response.finish().success();returnresponse;}response.withNextQuestion(turn.getNextQuestionNumber(),turn.getNextQuestion(),isFollowUp,followUpCount).success();returnresponse;}这保证了即使用户因为网络超时重试系统也不会重新评估答案、重新推进流程、重新计分——而是精准地回放上一次的结果。四、架构总览将上下两篇讲解的所有组件放在一起整个状态治理体系的协作关系如下┌──────────────────────────┐ │ 业务请求入口 │ │ (Answer Pipeline 等) │ └────────────┬─────────────┘ │ ┌──────────▼──────────┐ │ ensureRuntime(...) │ ◄── 恢复机制入口上篇 │ (RehydrateService) │ └──────────┬──────────┘ │ ┌───────────────┴───────────────┐ │ isRuntimeReady(scope)? │ ├── YES ──► CACHE EXACT │ └── NO │ │ │ ┌────────▼────────┐ │ │ 分布式锁竞争 │ │ └────────┬────────┘ │ ┌──────┴──────┐ │ Owner│ │Follower │ │ │ │ ┌────────▼───┐ ┌─────▼────────┐ │ │ 优先 Snapshot│ │ 轮询等待 │ │ │ 恢复 │ │ (4×80ms) │ │ └──────┬──────┘ └─────┬────────┘ │ │ │ │ ┌──────▼───┐ │ │ │ 降级材料 │ │ │ │ 推导恢复 │ │ │ └──────┬───┘ │ │ │ │ │ ▼ ▼ │ 写回 Redis 返回 RuntimeView │ │ ──────────── 业务继续推进 ──────────────────────── │ │ ┌──────────────────────────┐ │ │ 业务成功后触发刷新 │ │ │ refreshAfterXxx(...) │ ◄┘ 更新机制入口本篇 └────────────┬─────────────┘ │ ┌──────────▼──────────┐ │ HotRefreshCoordinator│ ◄── 防抖 聚合 │ (按 session 聚合) │ └──────────┬──────────┘ │ ┌──────────▼──────────┐ │ refreshSnapshot(...) │ │ ┌─────────────────┐ │ │ │ Patch CAS │ │ │ │ 单调性校验 │ │ │ │ 幂等补偿 │ │ │ └─────────────────┘ │ └──────────┬──────────┘ │ ┌────────────┼────────────┐ │ │ │ Hot Snapshot Cold Snapshot Turn Archive (CAS 更新) (按需更新) (追加写入)五、设计总结可复用的七条核心原则从这套实现中可以提炼出适用于任何长会话系统的通用原则原则一承认运行态会缺失提前设计好恢复入口不要幻想 Redis 永不失手。只要会话足够长、请求足够多状态缺口几乎必然出现。关键是提前准备好恢复入口统一的ensureRuntime(...)所有业务请求必须先过这一关恢复依据Hot Snapshot Cold Snapshot Turn Archive 三级检查点恢复边界Confidence 告诉你恢复结果可不可靠Scope 告诉你恢复范围够不够原则二热冷分层降低写放大高频变化的状态flow、score、turns和低频变化的材料questions、resume context分开存储。每次业务推进只更新热快照避免无谓的冷数据重写。热快照走 CAS 精细控制冷快照走 upsert 宽松更新。原则三差量更新Patch代替整包覆盖用字段级 Patch 代替整个文档 rewrite。好处是避免并发场景下后写覆盖前写减少 Mongo 写入量和网络传输天然支持热冷分层——热快照和冷快照独立 Patch原则四CAS 单调性校验双重并发保护CAS 保证版本号一致性单调性校验保证业务语义一致性。两者结合即使多个线程同时在刷新热快照也不会出现旧版本覆盖新版本或状态回退的情况。原则五幂等补偿兜底重试场景通过lastMutationIdrequestIdturnDigest三重机制识别重复请求保证超时重试不会导致状态被重复推进。在 CAS 失败后也做一次幂等检查避免误判为版本冲突而错误重试。原则六Owner-Follower 避免并发恢复浪费同一 session 的并发恢复请求只需要一个 owner 执行恢复其他 follower 等待并复用结果。这在高并发场景下尤为重要——它避免了重复的 Mongo 查询、Redis 写入和状态计算。原则七恢复结果带置信度上层按级处理恢复机制不返回简单的 boolean而是返回带有 Confidence 和 RestoreSource 的视图对象。上层业务可以根据置信度决定是否继续写入EXACT/DERIVED→ 可以继续推进业务READ_ONLY→ 只能提供查询不能推进TERMINAL→ 会话已结束直接回放这避免了在不可靠的状态上做出不可逆的业务决策。六、写在最后这套方案最终解决的问题不是永远不丢状态而是**“丢了也能恢复”**。它把一个看似不可能完成的任务——保证长会话运行态永远完整——转化成了一个可工程化落地的方案只要检查点还在恢复入口就能把记忆找回来让这场会话正确地走完。从一个更高的视角来看这套方案的本质不是某一个技术点的巧妙运用而是一种对长会话状态本质的认知升级长会话运行态是一种高脆弱、高时序、高并发敏感的数据形态不能用普通缓存思路去管理它必须建立一套可恢复的长会话状态治理体系——热层承接高频读写持久层沉淀恢复材料懒恢复入口保障兜底并发保护防止写乱幂等补偿兜底重试理解了这一点热冷分层、懒恢复、Patch CAS、单调性校验和幂等补偿就不再是一堆零散的技术技巧而是一套完整方案中不可或缺的组成部分。
长会话状态治理(下):数据更新机制、并发保护与可复用设计原则
发布时间:2026/6/13 9:37:10
长会话状态治理下数据更新机制、并发保护与可复用设计原则系列导航本文是长会话状态治理系列的下篇聚焦于业务成功后怎么安全写下检查点以及可复用的设计原则。上篇讲解了问题分析、存储分层设计和恢复机制的完整实现见 长会话状态治理上。一、回顾与本篇定位在上篇中我们分析了长会话运行态为什么天然脆弱介绍了 Redis Hot Snapshot Cold Snapshot Turn Archive 的三层存储分层设计并深入剖析了以ensureRuntime(...)为核心的恢复机制——当 Redis 运行态缺失时如何从检查点材料中安全地恢复会话状态。但恢复机制只是闭环的一半。用一句话来概括两条主线的关系更新机制负责生产恢复材料恢复机制负责消费恢复材料。没有更新机制持续产出高质量的检查点再好的恢复入口也巧妇难为无米之炊。本篇将深入讲解数据更新机制的完整实现——从触发时机、防抖聚合、CAS 重试循环到 Patch 差量更新、单调性校验、幂等补偿再到轮次归档与软回放幂等的详细设计最后提炼出适用于任何长会话系统的七条可复用原则。二、数据更新机制业务成功后怎么安全写下检查点2.1 核心目标数据更新机制的核心目标是当业务成功推进后把最新状态安全写成可恢复检查点供后续恢复链路使用。它不是简单的save 一下对象因为这里的状态不是普通 CRUD。它面对的是高频更新、并发更新、热冷数据变化频率不同、请求会重试、结果可能半成功这些现实挑战。2.2 触发时机更新机制的入口分布在关键业务节点之后入口方法触发时机快照级别是否强制刷盘refreshAfterQuestionExtraction(sessionId)出题完成QUESTION_READY否可防抖refreshAfterDemeanorEvaluated(sessionId)神态评分完成ACTIVE否refreshAfterAnswerCommitted(sessionId, requestId, turnLog)答题提交成功ACTIVE是强制刷盘refreshAfterFinalize(sessionId)面试结束FINALIZED是强制刷盘每个入口都会构造一个HotRefreshRequest携带触发类型、requestId 和已提交的 turnLog// 答题提交成功时publicstaticHotRefreshRequestanswerCommitted(StringsessionId,StringrequestId,InterviewTurnLogturn){returnHotRefreshRequest.builder().sessionId(sessionId).trigger(ANSWER_COMMITTED).snapshotLevel(ACTIVE).requestId(requestId).committedTurn(turn).persistTurnArchive(true)// 需要归档.forceFlush(true)// 强制立即刷盘.build();}// 出题完成时publicstaticHotRefreshRequestquestionReady(StringsessionId){returnHotRefreshRequest.builder().sessionId(sessionId).trigger(QUESTION_READY).snapshotLevel(QUESTION_READY).persistTurnArchive(false)// 不需要归档.forceFlush(false)// 可以防抖.build();}注意forceFlush的区别答题提交和面试结束是关键检查点必须立即刷盘出题完成等中间态可以延迟合并减少写压力。2.3 HotRefreshCoordinator防抖与聚合每次业务成功后不是直接去写 Mongo而是先把刷新意图提交给HotRefreshCoordinator。这个 Coordinator 做了三件事1. 按 session 聚合同一 session 在短时间内多次刷新意图会被合并到一个 Bucket 中。2. 防抖延迟普通中间态刷新会延迟一个 debounce window默认 150ms等待后续意图合并。最大聚合窗口默认为 500ms确保不会无限延迟。3. 关键检查点立即刷盘forceFlush标记的刷新会跳过防抖等待当前 flush 完成后立即执行。submit(request) │ ├── Coordinator 未启用 → 直接 flush同步执行 │ ├── forceFlush true答题提交、面试结束等 │ └── 合并到 Bucket │ └── 如果当前有 flush 正在执行 │ │ └── 等待最多 600ms直到当前 flush 完成 │ └── 立即执行 flushBucket() │ └── forceFlush false出题完成等中间态 └── 合并到 Bucket └── 如果当前没有 flush 正在执行 └── 延迟 debounceWindow150ms后调度执行Bucket 内部通过 synchronized 保证线程安全merge 逻辑会把多次刷新的信息合并保留最新的 snapshotLevel、最新的 requestId 和 turnLog合并 persistTurnArchive 标记。Coordinator 内部用ConcurrentHashMapsessionId, Bucket管理所有活跃 session 的刷新意图。当某个 session 的所有刷新都已完成且没有新的意图时对应的 Bucket 会被清理避免内存泄漏。2.4 refreshSnapshot刷新核心流程refreshSnapshot(...)是数据更新机制的真正核心。它的完整执行流程如下refreshSnapshot(sessionId, snapshotLevel, requestId, committedTurn, persistTurnArchive) │ ├── Step 1: 加载当前状态 │ ├── 从 DB 加载 InterviewSession │ ├── 从 DB 加载 InterviewQuestion │ ├── 从 Mongo 加载当前 Hot Snapshot │ └── 从 Mongo 加载当前 Cold Snapshot │ ├── Step 2: 进入 CAS 重试循环最多 3 次 │ │ │ ├── Step 2a: 从 Redis DB 组装最新状态 │ │ ├── resolveQuestions(): 优先 Redis → 降级 DB │ │ ├── resolveSuggestions(): 优先 Redis → 降级 DB │ │ ├── resolveResumeContext(): 优先 Redis → 降级 DB │ │ ├── resolveTurns(): Redis → Archive → Snapshot → committedTurn │ │ ├── resolveFlow(): Redis → 终态推导 → turns 推导 → 初始 Flow │ │ └── resolveScoreAggregate(): 从 turns 重新计算 sum/count │ │ │ ├── Step 2b: 归档当前轮次如果需要 │ │ └── archiveTurn(): │ │ ├── 先检查同一 requestId 是否已归档幂等 │ │ ├── 如果已归档返回已有 seq │ │ └── 否则追加一条 Turn Archiveseq 单调递增 │ │ │ ├── Step 2c: 构建 HotPatch增量更新载体 │ │ └── buildHotPatch(): │ │ ├── snapshotVersion: current 1 │ │ ├── flow / scoreAggregate / recentTurns (上限 20) │ │ ├── archiveWatermark / lastTurnSeq │ │ ├── lastMutationId / lastAppliedRequestId │ │ └── lastCommittedQuestionNumber / lastCommittedTurnDigest │ │ │ ├── Step 2d: 三重保护检查 │ │ ├── shouldSkipHotPatch? → 所有字段一致跳过写入 │ │ ├── seedHotSnapshot? → 快照不存在初始化种子 │ │ ├── isMutationAlreadyApplied? → 同一 requestId 已处理跳过 │ │ └── validateHotPatchMonotonicity → 单调性校验 │ │ │ ├── Step 2e: CAS 写入 │ │ └── compareAndSetPatch(sessionId, expectedVersion, patch) │ │ ├── 成功 → 跳出循环 │ │ └── 失败 → 检查最新版本是否已包含同一 mutationId │ │ ├── 是 → 幂等命中安全跳过 │ │ └── 否 → backoff 后重试 │ │ │ └── Step 2f: CAS 重试退避 │ └── backoff baseDelay × (attempt 1) random(10~30ms) │ ├── Step 3: 条件触发冷快照更新 │ └── applyColdSnapshotIfNecessary(): │ ├── 如果 snapshotLevel ACTIVE 且冷快照已存在 → 跳过 │ └── 否则写入冷快照questions/suggestions/resumeContext 等 │ └── Step 4: 返回结果 ├── 成功 → true └── CAS 重试耗尽 → false打 warn 日志这里有几个值得注意的设计决策为什么每轮都重新组装状态因为在 CAS 重试循环中当前线程可能在等待期间被其他线程抢先更新了快照。每次循环都重新从 Redis 和 DB 读取最新数据确保构建出来的 Patch 是基于最新版本的状态。为什么 recentTurns 限制为 20 轮热快照的recentTurns只需要保留最近 N 轮用于快速恢复上下文。全量历史由 Turn Archive 承担。这避免了热快照文档随面试推进无限膨胀。2.5 Patch 而非整包覆盖每次更新不是把整个文档重写而是通过HotPatch/ColdPatch做字段级增量更新。以HotSnapshotRepositoryImpl为例它构建的 MongoUpdate对象只 set 有值的字段privateUpdatebuildUpdate(StringsessionId,HotPatchpatch,Datenow,booleanwithOnInsert){UpdateupdatenewUpdate();if(withOnInsert){update.setOnInsert(sessionId,sessionId);// upsert 时设置update.setOnInsert(createTime,now);}// 逐字段按需 setif(patch.getFlow()!null)update.set(flow,patch.getFlow());if(patch.getScoreAggregate()!null)update.set(scoreAggregate,patch.getScoreAggregate());if(patch.getRecentTurns()!null)update.set(recentTurns,patch.getRecentTurns());if(patch.getArchiveWatermark()!null)update.set(archiveWatermark,patch.getArchiveWatermark());if(patch.getLastMutationId()!null)update.set(lastMutationId,patch.getLastMutationId());// ... 其余字段同理update.set(updateTime,now);returnupdate;}冷快照的 Patch 逻辑类似但更新的字段是低频材料questions / suggestions / resumeContext / demeanorScore 等并且采用 upsert 语义——如果文档不存在则创建存在则按字段更新。Patch 的好处避免并发场景下后写覆盖前写——每次 Patch 只更新变化的字段不触碰其他字段减少 Mongo 写入量——不需要每次都把整个文档序列化并传输天然支持热冷分层——热快照和冷快照独立 Patch互不干扰2.6 CAS 并发保护热快照的更新采用Compare-And-Set语义这是保证并发安全的核心publicbooleancompareAndSetPatch(StringsessionId,LongexpectedVersion,HotPatchpatch){QueryqueryQuery.query(Criteria.where(sessionId).is(sessionId).and(snapshotVersion).is(expectedVersion)// CAS 条件);UpdateupdatebuildUpdate(sessionId,patch,now,false);UpdateResultresultmongoTemplate.updateFirst(query,update,HotSnapshot.class);returnresult.getModifiedCount()0;}只有当前版本号snapshotVersion与预期一致时才会写入成功。如果并发请求导致版本号已经前进CAS 会失败随后触发退避重试参数值说明最大重试次数3避免无限重试基础退避20ms × (attempt 1)线性递增随机抖动1030ms避免多个线程同时重试惊群效应CAS 保证了即使多个线程同时在刷新热快照也不会出现旧版本覆盖新版本的情况。为什么冷快照不需要 CAS冷快照的更新频率低出题完成、神态评分完成等关键节点并发冲突概率小。而且冷快照更新采用的是 upsert 字段级 Patch即使并发写入也不会互相覆盖只是可能重复设置相同值。这是一种有意识的简化——用最小的复杂度覆盖最大的风险。2.7 单调性保护在 CAS 之前还有一层validateHotPatchMonotonicity校验确保关键业务指标不会回退privatevoidvalidateHotPatchMonotonicity(HotSnapshotcurrent,HotPatchpatch){// 1. 轮次序号不能回退if(patch.getLastTurnSeq()current.getLastTurnSeq())thrownewIllegalStateException(hot snapshot lastTurnSeq regressed);// 2. 归档水位不能回退if(patch.getArchiveWatermark()current.getArchiveWatermark())thrownewIllegalStateException(hot snapshot archiveWatermark regressed);// 3. 计分次数不能减少if(patch.getScoreAggregate().getScoreCount()current.getScoreAggregate().getScoreCount())thrownewIllegalStateException(hot snapshot scoreCount regressed);// 4. 流程进度不能倒退非终态下if(!current.getFlow().isCompleted()patch.getFlow().getCurrentIndex()current.getFlow().getCurrentIndex())thrownewIllegalStateException(hot snapshot flow index regressed);}如果检测到回退直接抛出IllegalStateException阻止脏数据写入。这层校验相当于在 CAS 的版本号一致性之上又加了一层业务语义一致性的保护。为什么需要两层保护CAS 只能保证我的更新是基于最新版本做的但不能保证我构建出来的 Patch 内容本身是正确的。比如某个旧请求在重试时基于过期的 Redis 数据构建了一个 flow虽然 CAS 会拦住它版本号不对但如果恰好版本号一致极端场景单调性校验就是最后一道防线。2.8 幂等补偿热快照中记录了lastMutationId通常是 requestId。在刷新前和 CAS 失败后都会检查privatebooleanisMutationAlreadyApplied(HotSnapshothotSnapshot,StringmutationId){returnStrUtil.equals(hotSnapshot.getLastMutationId(),mutationId);}第一次检查CAS 之前如果当前快照的lastMutationId已经等于本次 requestId说明之前的请求已经成功写入直接跳过。第二次检查CAS 失败之后如果 CAS 失败重新加载最新版本的快照再做一次lastMutationId检查。如果最新版本的 mutationId 与当前请求一致说明另一个并发线程已经成功写入了同一个变更当前请求可以安全跳过。这保证了即使前端超时重试同一个 requestId 不会导致状态被重复推进。2.9 跳过快照更新当新旧快照的所有关键字段完全一致时系统会跳过无意义的写入privatebooleanshouldSkipHotPatch(HotSnapshotcurrent,HotPatchpatch){returnObjects.equals(current.getUserId(),patch.getUserId())StrUtil.equals(current.getSessionStatus(),patch.getSessionStatus())Objects.equals(current.getFlow(),patch.getFlow())Objects.equals(current.getScoreAggregate(),patch.getScoreAggregate())Objects.equals(current.getRecentTurns(),patch.getRecentTurns())// ... 全部字段对比;}这避免了在防抖窗口合并后、或重试场景下执行完全重复的 Mongo 写入。虽然 Mongo 的 upsert 本身是幂等的但跳过无意义的写入可以减少 IO 开销和版本号递增。三、轮次归档与软回放幂等3.1 Turn Archive 的设计每次答题成功提交后除了更新热快照的recentTurns还会向Turn Archive追加一条归档记录。归档实体结构非常精简Document(collectioninterview_session_turn_archive)publicclassInterviewSessionTurnArchive{IdprivateStringid;IndexedprivateStringsessionId;// 会话标识IndexedprivateStringrequestId;// 请求标识幂等键IndexedprivateLongseq;// 单调递增序号privateLongsnapshotVersion;// 关联的快照版本privateInterviewTurnLogturnPayload;// 完整轮次数据CreatedDateprivateDatecreatedAt;}Turn Archive 是全量追加、永不修改的。它承担两个职责全量历史回放恢复时可以按seq升序加载所有轮次得到完整的面试对话历史软回放幂等通过requestId判断某个请求是否已经成功处理过归档写入本身也做了幂等保护privateLongarchiveTurn(StringsessionId,StringrequestId,InterviewTurnLogturn,Longversion){// 先检查同一 requestId 是否已归档if(StrUtil.isNotBlank(requestId)){OptionalTurnArchiveexistingturnArchiveRepo.findBySessionIdAndRequestId(sessionId,requestId);if(existing.isPresent()){returnexisting.get().getSeq();// 已归档直接返回已有 seq}}// 否则追加新归档seq 从最大 seq 1 开始longnextSeqturnArchiveRepo.findFirstBySessionIdOrderBySeqDesc(sessionId).map(a-a.getSeq()1L).orElse(1L);// ... 保存returnnextSeq;}3.2 软回放幂等的三重匹配当重复请求进来时findReplayResponse(...)会通过三重机制识别重复findReplayTurn(snapshot, requestId, questionNumber, answerContent) │ ├── 第一重按 requestId 匹配 recentTurns │ └── 从后往前遍历 recentTurns查找 requestId 一致的 turn │ └── 命中 → 直接回放该 turn 的结果 │ ├── 第二重按 turnDigest 匹配 recentTurns │ └── SHA256(题号 | 答案前1000字符) │ └── 从后往前遍历 recentTurns查找 digest 一致的 turn │ └── 命中 → 直接回放 │ └── 第三重按 lastCommittedTurnDigest 匹配 Archive └── 如果热快照的 lastCommittedTurnDigest 与当前请求一致 └── 加载 Turn Archive 的最后一条记录 └── 命中 → 回放该归档的结果三重匹配的设计考虑了不同的降级场景第一重依赖 requestId最常见也最可靠第二重依赖内容摘要在 requestId 丢失时兜底第三重依赖热快照的摘要字段 归档在 recentTurns 被截断时兜底其中turnDigest的计算方式值得注意——它取题号和答案内容的前 1000 个字符做 SHA256privateStringbuildTurnDigest(StringquestionNumber,StringanswerContent){returnDigestUtil.sha256Hex(normalizeQuestionNumber(questionNumber)|truncateAnswer(answerContent));}privateStringtruncateAnswer(StringanswerContent){returnanswerContentnull?:answerContent.length()1000?answerContent:answerContent.substring(0,1000);}截取 1000 字符是为了平衡匹配精度和性能。在正常业务场景下同一个题号 同一个用户的答案内容前 1000 字符已经足够唯一。3.3 回放响应的构建一旦匹配到已处理的 turn系统会直接从 turn 的数据构建完整响应而不是重新执行答题链路privateInterviewAnswerRespDTObuildReplayResponse(InterviewTurnLogturn){InterviewAnswerRespDTOresponseInterviewAnswerRespDTO.init();response.withCurrentQuestion(turn.getQuestionNumber(),turn.getQuestionContent());response.withEvaluation(turn.getScore(),turn.getFeedback(),turn.getTotalScore());if(Boolean.TRUE.equals(turn.getFinished())){response.finish().success();returnresponse;}response.withNextQuestion(turn.getNextQuestionNumber(),turn.getNextQuestion(),isFollowUp,followUpCount).success();returnresponse;}这保证了即使用户因为网络超时重试系统也不会重新评估答案、重新推进流程、重新计分——而是精准地回放上一次的结果。四、架构总览将上下两篇讲解的所有组件放在一起整个状态治理体系的协作关系如下┌──────────────────────────┐ │ 业务请求入口 │ │ (Answer Pipeline 等) │ └────────────┬─────────────┘ │ ┌──────────▼──────────┐ │ ensureRuntime(...) │ ◄── 恢复机制入口上篇 │ (RehydrateService) │ └──────────┬──────────┘ │ ┌───────────────┴───────────────┐ │ isRuntimeReady(scope)? │ ├── YES ──► CACHE EXACT │ └── NO │ │ │ ┌────────▼────────┐ │ │ 分布式锁竞争 │ │ └────────┬────────┘ │ ┌──────┴──────┐ │ Owner│ │Follower │ │ │ │ ┌────────▼───┐ ┌─────▼────────┐ │ │ 优先 Snapshot│ │ 轮询等待 │ │ │ 恢复 │ │ (4×80ms) │ │ └──────┬──────┘ └─────┬────────┘ │ │ │ │ ┌──────▼───┐ │ │ │ 降级材料 │ │ │ │ 推导恢复 │ │ │ └──────┬───┘ │ │ │ │ │ ▼ ▼ │ 写回 Redis 返回 RuntimeView │ │ ──────────── 业务继续推进 ──────────────────────── │ │ ┌──────────────────────────┐ │ │ 业务成功后触发刷新 │ │ │ refreshAfterXxx(...) │ ◄┘ 更新机制入口本篇 └────────────┬─────────────┘ │ ┌──────────▼──────────┐ │ HotRefreshCoordinator│ ◄── 防抖 聚合 │ (按 session 聚合) │ └──────────┬──────────┘ │ ┌──────────▼──────────┐ │ refreshSnapshot(...) │ │ ┌─────────────────┐ │ │ │ Patch CAS │ │ │ │ 单调性校验 │ │ │ │ 幂等补偿 │ │ │ └─────────────────┘ │ └──────────┬──────────┘ │ ┌────────────┼────────────┐ │ │ │ Hot Snapshot Cold Snapshot Turn Archive (CAS 更新) (按需更新) (追加写入)五、设计总结可复用的七条核心原则从这套实现中可以提炼出适用于任何长会话系统的通用原则原则一承认运行态会缺失提前设计好恢复入口不要幻想 Redis 永不失手。只要会话足够长、请求足够多状态缺口几乎必然出现。关键是提前准备好恢复入口统一的ensureRuntime(...)所有业务请求必须先过这一关恢复依据Hot Snapshot Cold Snapshot Turn Archive 三级检查点恢复边界Confidence 告诉你恢复结果可不可靠Scope 告诉你恢复范围够不够原则二热冷分层降低写放大高频变化的状态flow、score、turns和低频变化的材料questions、resume context分开存储。每次业务推进只更新热快照避免无谓的冷数据重写。热快照走 CAS 精细控制冷快照走 upsert 宽松更新。原则三差量更新Patch代替整包覆盖用字段级 Patch 代替整个文档 rewrite。好处是避免并发场景下后写覆盖前写减少 Mongo 写入量和网络传输天然支持热冷分层——热快照和冷快照独立 Patch原则四CAS 单调性校验双重并发保护CAS 保证版本号一致性单调性校验保证业务语义一致性。两者结合即使多个线程同时在刷新热快照也不会出现旧版本覆盖新版本或状态回退的情况。原则五幂等补偿兜底重试场景通过lastMutationIdrequestIdturnDigest三重机制识别重复请求保证超时重试不会导致状态被重复推进。在 CAS 失败后也做一次幂等检查避免误判为版本冲突而错误重试。原则六Owner-Follower 避免并发恢复浪费同一 session 的并发恢复请求只需要一个 owner 执行恢复其他 follower 等待并复用结果。这在高并发场景下尤为重要——它避免了重复的 Mongo 查询、Redis 写入和状态计算。原则七恢复结果带置信度上层按级处理恢复机制不返回简单的 boolean而是返回带有 Confidence 和 RestoreSource 的视图对象。上层业务可以根据置信度决定是否继续写入EXACT/DERIVED→ 可以继续推进业务READ_ONLY→ 只能提供查询不能推进TERMINAL→ 会话已结束直接回放这避免了在不可靠的状态上做出不可逆的业务决策。六、写在最后这套方案最终解决的问题不是永远不丢状态而是**“丢了也能恢复”**。它把一个看似不可能完成的任务——保证长会话运行态永远完整——转化成了一个可工程化落地的方案只要检查点还在恢复入口就能把记忆找回来让这场会话正确地走完。从一个更高的视角来看这套方案的本质不是某一个技术点的巧妙运用而是一种对长会话状态本质的认知升级长会话运行态是一种高脆弱、高时序、高并发敏感的数据形态不能用普通缓存思路去管理它必须建立一套可恢复的长会话状态治理体系——热层承接高频读写持久层沉淀恢复材料懒恢复入口保障兜底并发保护防止写乱幂等补偿兜底重试理解了这一点热冷分层、懒恢复、Patch CAS、单调性校验和幂等补偿就不再是一堆零散的技术技巧而是一套完整方案中不可或缺的组成部分。