1. 项目概述当ChatGPT遇上CQRS最近在设计和实现一个需要与大型语言模型LLM深度集成的系统时我遇到了一个典型的架构挑战如何优雅地处理用户与AI之间复杂的、状态化的交互流程比如一个用户向ChatGPT发出一个指令“帮我分析上周的销售数据并生成报告”这背后可能涉及多个步骤——查询数据库、执行计算、生成文本、甚至调用外部API。如果所有逻辑都塞在一个庞大的服务里代码很快就会变得难以维护尤其是当需要区分“用户指令的解析”查询和“报告生成任务的执行”命令时。这正是CQRS命令查询职责分离模式大显身手的地方。这个项目我称之为“ChatGPT Implements Work With Users Using the CQRS Pattern”核心就是探讨如何将CQRS的思想应用于构建与ChatGPT或同类LLM协作的、健壮的后端服务。它不是简单地在ChatGPT外面套一个API网关而是从领域逻辑层面将用户与AI的交互清晰地分解为“命令”触发一个需要改变系统状态或执行复杂流程的动作和“查询”获取信息、请求解释或进行简单对话。通过这种分离我们能够获得更好的性能、更清晰的代码结构以及应对复杂工作流的强大能力。无论你是在构建AI客服、智能助手后台还是任何需要将LLM作为核心“工作者”集成到业务系统中的开发者这套思路都能为你提供一个坚实且可扩展的架构蓝图。2. 核心架构思路为什么CQRS是AI集成的绝配在深入代码之前我们必须先理解为什么CQRS模式特别适合处理与ChatGPT的交互。传统的CRUD架构在面对LLM时常常显得力不从心主要原因在于LLM交互的本质是异步的、有状态的且包含明显的“意图”与“执行”的分离。2.1 传统CRUD的瓶颈与CQRS的优势想象一个简单的场景用户请求“总结我的未读邮件”。在CRUD模型下一个控制器可能同时负责1验证用户身份和权限命令侧2调用邮件API获取数据查询侧3构造Prompt发送给ChatGPT命令侧4等待AI响应并返回查询侧5可能还要将结果缓存或记录日志命令侧。这种混杂的职责使得服务难以测试、扩展并且当AI处理耗时较长时会阻塞整个HTTP请求线程。CQRS通过强制性的职责分离解决了这些问题命令Command负责“做某事”会改变系统状态。它应该是异步的、无返回值的或仅返回一个任务ID。在我们的上下文中所有触发AI执行具体任务、写数据库、调用外部写操作API的请求都是命令。例如“生成销售报告”、“根据对话历史更新用户画像”。查询Query负责“读数据”不会改变系统状态。它应该是同步的、快速返回结果的。例如“获取刚才那个报告生成的进度”、“列出我所有与AI的对话历史”、“向AI提出一个简单的知识性问题”。这种分离带来了几个直接好处模型优化命令模型和查询模型可以针对各自的工作负载进行独立优化。查询模型可以极度简化甚至直接映射到数据库的只读副本或缓存视图以实现毫秒级响应。复杂度管理将复杂的业务逻辑尤其是涉及多步AI调用和状态转换的隔离在命令端保持查询端的简单与稳定。伸缩性命令处理和查询处理可以独立伸缩。如果AI任务繁重可以横向扩展命令处理器如果查询请求量大可以增强查询端的缓存和数据库读副本。2.2 领域驱动设计DDD与CQRS的协同CQRS常常与事件溯源Event Sourcing结合但在这个项目中我们采用更轻量级、更实用的方法将CQRS与DDD的聚合根Aggregate Root和领域事件Domain Event结合。我们将用户与AI的一次“工作任务”Work Session视为一个聚合根。这个聚合根会接收各种命令如StartAnalysisCommandProvideAdditionalInfoCommand并发布相应的领域事件如AnalysisStartedEventAITaskDispatchedEventWorkCompletedEvent。这些领域事件是系统的脊梁。它们不仅用于在聚合内部驱动状态变化更重要的是它们会被发布到消息总线如RabbitMQ, Kafka从而触发后续的进程管理器Process Manager或** Saga**。进程管理器是协调复杂、长期运行工作流的核心模式它监听事件决定下一步该发送什么命令。这正是管理多步AI交互的理想抽象。注意不要一开始就引入事件溯源Event Sourcing。除非你有强烈的审计、时间旅行调试需求否则事件溯源的复杂性可能会超过其收益。对于大多数AI集成场景用领域事件驱动流程并用常规的数据库持久化聚合的“当前状态”是更务实的选择。3. 技术栈选型与核心组件设计基于上述架构思路我们选择了一套能够支撑高并发、异步处理的技术栈。选型的核心原则是解耦、异步、可观测。3.1 后端技术栈详解语言与框架我选择了C# / .NET 8与ASP.NET Core。.NET的强类型系统、优秀的异步编程模型async/await以及对依赖注入的原生支持非常适合构建结构清晰的领域模型。当然Java/Spring Boot Node.js/NestJS Python/FastAPI也都是绝佳选择核心模式是通用的。命令与查询总线使用MediatR库。它是一个轻量级的进程内中介者模式实现能完美地将命令/查询的发送与处理解耦。发送一个ICommand或IQuery由对应的IRequestHandler来处理无需知道具体实现。持久化命令侧使用Entity Framework Core或Dapper将聚合根的状态持久化到关系型数据库如PostgreSQL, SQL Server的“写库”。表结构围绕聚合根设计。查询侧使用Dapper或EF Core的只读上下文连接到一个只读副本数据库。查询模型是面向视图的可能是一张扁平化的表或者是一个专门优化的查询视图。消息总线与事件处理使用MassTransit或Brighter。它们建立在RabbitMQ或Azure Service Bus之上提供了强大的消息发布/订阅、重试、死信队列等功能用于发布领域事件和实现进程管理器。与ChatGPT集成使用OpenAI .NET SDK或Azure OpenAI SDK。关键是要将其封装在领域服务中而不是在处理器里直接调用。这个服务负责构造Prompt、处理Token限制、解析响应并返回结构化的结果。缓存Redis。用于缓存频繁的查询结果如常见的AI问答对、用户会话上下文以及作为进程管理器状态的临时存储。API网关与通信除了标准的RESTful API用于触发命令和简单查询强烈建议为需要实时进度更新的场景如报告生成提供SignalR支持实现服务器向客户端的主动推送。3.2 核心领域模型设计让我们定义一个核心聚合根WorkSession。public class WorkSession : AggregateRootGuid // 假设AggregateRoot是一个基类 { public Guid Id { get; private set; } public string UserId { get; private set; } public WorkSessionStatus Status { get; private set; } // e.g., Draft, Running, WaitingForInput, Completed, Failed public string CurrentObjective { get; private set; } // 当前任务目标 public ListConversationTurn ConversationHistory { get; private set; } // 对话历史 public Dictionarystring, object ContextData { get; private set; } // 上下文数据如已收集的信息 public string? FinalResult { get; private set; } // 最终结果 // 命令处理方法 public void StartAnalysis(StartAnalysisCommand command) { if (Status ! WorkSessionStatus.Draft) throw new InvalidOperationException(Session already started.); CurrentObjective command.Objective; Status WorkSessionStatus.Running; AddConversationTurn(user, command.UserInput); // 发布领域事件 AddDomainEvent(new AnalysisStartedEvent(Id, UserId, CurrentObjective)); AddDomainEvent(new AITaskDispatchedEvent(Id, InitialAnalysis, ConversationHistory)); } public void HandleAIResponse(AIResponseReceivedEvent event) { AddConversationTurn(assistant, event.ResponseContent); // 根据AI响应内容可能更新状态、发布新事件 if (event.SuggestsNextAction need_more_info) { Status WorkSessionStatus.WaitingForInput; AddDomainEvent(new WaitingForUserInputEvent(Id, event.RequiredInfo)); } else if (event.SuggestsNextAction complete) { FinalResult event.ResponseContent; Status WorkSessionStatus.Completed; AddDomainEvent(new WorkCompletedEvent(Id, FinalResult)); } } // ... 其他命令处理方法 }这个WorkSession聚合根是保证一致性的边界。所有改变其状态的操作都必须通过它的方法响应命令来完成。4. 命令端实现驱动AI工作流引擎命令端是整个系统的驱动者。它的职责是接收用户意图通过聚合根验证业务规则然后发布事件触发后续的AI调用和流程控制。4.1 命令处理器Command Handler的实现一个典型的命令处理器例如处理StartAnalysisCommandpublic class StartAnalysisCommandHandler : IRequestHandlerStartAnalysisCommand, Guid { private readonly IRepositoryWorkSession _sessionRepository; private readonly IPublishEndpoint _publishEndpoint; // MassTransit 接口 public async TaskGuid Handle(StartAnalysisCommand request, CancellationToken cancellationToken) { // 1. 创建或获取聚合根 var session WorkSession.StartNew(request.UserId, request.Objective, request.InitialInput); // 2. 持久化聚合根的新状态 await _sessionRepository.SaveAsync(session, cancellationToken); // 3. 发布聚合根产生的所有领域事件 foreach (var domainEvent in session.DomainEvents) { // 将领域事件转换为集成事件可选添加更多上下文 var integrationEvent new IntegrationEventWrapper(domainEvent); await _publishEndpoint.Publish(integrationEvent, cancellationToken); } session.ClearDomainEvents(); // 4. 返回会话ID客户端可以用它来查询进度 return session.Id; } }这里的关键是处理器不直接调用AI。它只负责更新领域状态并发布事件。AI调用是由监听这些事件的领域事件处理器来触发的。4.2 领域事件处理器与AI服务封装接下来我们创建一个处理器来响应AITaskDispatchedEventpublic class AITaskDispatchedEventHandler : IConsumerAITaskDispatchedEvent // MassTransit的消费者接口 { private readonly IAIService _aiService; private readonly IRepositoryWorkSession _sessionRepository; private readonly IPublishEndpoint _publishEndpoint; public async Task Consume(ConsumeContextAITaskDispatchedEvent context) { var event context.Message; var session await _sessionRepository.GetByIdAsync(event.SessionId); // 调用封装的AI服务 var aiResponse await _aiService.ProcessTaskAsync( event.TaskType, session.ConversationHistory, session.ContextData ); // 根据AI响应向聚合根发送一个新的“内部命令”通过发布事件 var responseEvent new AIResponseReceivedEvent( event.SessionId, aiResponse.Content, aiResponse.SuggestedNextAction, aiResponse.RequiredParameters ); await _publishEndpoint.Publish(responseEvent); } }而IAIService是一个领域服务它封装了所有与OpenAI API的交互细节public class OpenAIService : IAIService { private readonly IOpenAIClient _client; private readonly IPromptTemplateEngine _templateEngine; public async TaskAIResponse ProcessTaskAsync(string taskType, ListConversationTurn history, Dictionarystring, object context) { // 1. 根据任务类型选择Prompt模板 var promptTemplate GetTemplate(taskType); // 2. 使用上下文数据渲染Prompt var fullPrompt _templateEngine.Render(promptTemplate, new { History history, Context context }); // 3. 调用API注意处理速率限制、超时和重试 var chatRequest new ChatCompletionRequest { Messages BuildMessagesFromPrompt(fullPrompt), Model gpt-4, Temperature 0.7, MaxTokens 2000 }; var response await _client.GetChatCompletionAsync(chatRequest); // 4. 解析响应可能使用JSON模式如OpenAI的function calling来获取结构化输出 var structuredOutput ParseAIResponse(response.Choices.First().Message.Content); return new AIResponse { Content structuredOutput.Answer, SuggestedNextAction structuredOutput.NextAction, RequiredParameters structuredOutput.NeededInfo }; } }实操心得在Prompt模板中明确指示AI以特定JSON格式返回可以极大简化后续的解析逻辑。利用OpenAI的response_format参数或Function Calling特性能获得更稳定、结构化的输出。5. 查询端实现高效的数据读取与状态展示查询端的设计目标是快和简单。它不关心业务逻辑只关心如何以最合适的形式把数据呈现给客户端。5.1 查询模型与只读存储查询端的数据模型应该完全根据前端或客户端的需要来设计。例如一个WorkSessionProgressView-- 在只读副本数据库中的一个视图或表 CREATE VIEW vw_WorkSessionProgress AS SELECT ws.Id, ws.UserId, ws.Status, ws.CurrentObjective, ws.LastUpdatedAt, -- 计算字段如进度百分比这是一个简化示例实际进度可能更复杂 CASE WHEN ws.Status Completed THEN 100 WHEN ws.Status Running THEN 50 -- 可能从其他表计算 ELSE 0 END as ProgressPercentage, -- 最新的AI回复摘要 (SELECT TOP 1 Content FROM ConversationTurns WHERE SessionId ws.Id AND Role assistant ORDER BY TurnNumber DESC) as LastAIMessage FROM WriteDatabase.WorkSessions ws; -- 假设从写库同步过来这个视图扁平化了WorkSession聚合及其相关的ConversationTurn查询效率极高。5.2 查询处理器与缓存策略对应的查询处理器非常简单public class GetSessionProgressQueryHandler : IRequestHandlerGetSessionProgressQuery, WorkSessionProgressDto { private readonly IQuerySessionRepository _queryRepo; private readonly IDistributedCache _cache; public async TaskWorkSessionProgressDto Handle(GetSessionProgressQuery request, CancellationToken ct) { var cacheKey $session_progress:{request.SessionId}; // 尝试从缓存读取 var cached await _cache.GetStringAsync(cacheKey, ct); if (cached ! null) { return JsonSerializer.DeserializeWorkSessionProgressDto(cached); } // 缓存未命中查询数据库 var progress await _queryRepo.GetProgressAsync(request.SessionId, ct); if (progress ! null) { // 写入缓存设置较短的过期时间因为进度更新频繁 await _cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(progress), new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow TimeSpan.FromSeconds(30) // 30秒后过期 }, ct); } return progress; } }对于对话历史这种可能较长的数据可以采用分页查询并且只缓存最近的N条。5.3 实时进度更新SignalR集成对于长时间运行的AI任务轮询查询进度对用户体验不友好。我们可以使用SignalR在状态发生变化时主动推送。在命令端当WorkSession的状态发生变化并发布WorkSessionUpdatedEvent时一个专门的事件处理器会捕获这个事件并通过SignalR Hub通知连接到该会话的所有客户端。public class WorkSessionUpdatedEventHandler : IConsumerWorkSessionUpdatedEvent { private readonly IHubContextWorkSessionHub _hubContext; public async Task Consume(ConsumeContextWorkSessionUpdatedEvent context) { var event context.Message; // 通知该会话的所有客户端 await _hubContext.Clients.Group(event.SessionId.ToString()) .SendAsync(ProgressUpdated, new { event.SessionId, event.NewStatus, event.Progress }); } }客户端在发起任务后连接到Hub并加入以SessionId命名的组即可实时接收更新。6. 进程管理器编排复杂多步AI工作流这是整个架构中最能体现价值的部分。当用户的任务需要多个AI调用步骤并且中间可能需要用户介入时一个简单的“发布-订阅”事件链会变得难以管理。进程管理器或Saga就是用来协调这种长期运行业务流程的模式。假设我们有一个“数据获取-分析-报告生成”的工作流用户请求分析销售数据。AI需要先查询数据库获取原始数据命令1。拿到数据后AI进行分析命令2。分析结果需要用户确认某个指标。用户确认后AI生成最终报告命令3。6.1 进程管理器的状态机实现我们可以使用状态机来建模这个流程。这里使用MassTransit的Automatonymous库来定义一个ReportGenerationSagapublic class ReportGenerationSagaState : SagaStateMachineInstance { public Guid CorrelationId { get; set; } // 对应 WorkSession Id public string CurrentState { get; set; } public string Objective { get; set; } public string RawData { get; set; } public string AnalysisResult { get; set; } public bool UserConfirmed { get; set; } } public class ReportGenerationSaga : MassTransitStateMachineReportGenerationSagaState { // 定义事件 public EventAnalysisStartedEvent AnalysisStarted { get; private set; } public EventDataFetchedEvent DataFetched { get; private set; } public EventAnalysisCompletedEvent AnalysisCompleted { get; private set; } public EventUserConfirmationReceivedEvent UserConfirmationReceived { get; private set; } public EventReportGeneratedEvent ReportGenerated { get; private set; } // 定义状态 public State AwaitingDataFetch { get; private set; } public State AwaitingAnalysis { get; private set; } public State AwaitingUserConfirmation { get; private set; } public State AwaitingReportGeneration { get; private set; } public State Completed { get; private set; } public ReportGenerationSaga() { InstanceState(x x.CurrentState); // 流程起点分析开始事件 Initially( When(AnalysisStarted) .Then(context { context.Instance.Objective context.Data.Objective; }) .PublishAsync(context context.InitFetchDataCommand(new { SessionId context.Instance.CorrelationId, // 根据Objective构造查询参数... })) .TransitionTo(AwaitingDataFetch) ); // 步骤1数据获取完成 During(AwaitingDataFetch, When(DataFetched) .Then(context context.Instance.RawData context.Data.RawDataJson) .PublishAsync(context context.InitPerformAnalysisCommand(new { SessionId context.Instance.CorrelationId, RawData context.Instance.RawData })) .TransitionTo(AwaitingAnalysis) ); // 步骤2分析完成需要用户确认 During(AwaitingAnalysis, When(AnalysisCompleted) .Then(context { context.Instance.AnalysisResult context.Data.Result; // 这里可以发布一个事件通知前端需要用户确认 }) // 发布一个事件让前端展示确认界面 .PublishAsync(context context.InitRequestUserConfirmationEvent(new { SessionId context.Instance.CorrelationId, Question 是否确认指标X, AnalysisResult context.Instance.AnalysisResult })) .TransitionTo(AwaitingUserConfirmation) ); // 步骤3收到用户确认生成报告 During(AwaitingUserConfirmation, When(UserConfirmationReceived) .Then(context context.Instance.UserConfirmed context.Data.Confirmed) .If(context context.Instance.UserConfirmed, thenBinder thenBinder .PublishAsync(context context.InitGenerateReportCommand(new { SessionId context.Instance.CorrelationId, AnalysisResult context.Instance.AnalysisResult })) .TransitionTo(AwaitingReportGeneration) ) .Else(/* 用户拒绝可以结束或回到上一步 */) ); // 步骤4报告生成完成流程结束 During(AwaitingReportGeneration, When(ReportGenerated) .Then(context { // 最终结果已生成可以更新WorkSession的FinalResult }) .Finalize() ); } }这个状态机清晰地定义了整个工作流的步骤、状态转换和触发条件。进程管理器持有工作流的状态ReportGenerationSagaState并监听相关事件在适当的时候发出新的命令来驱动流程向前。6.2 进程管理器的持久化与容错MassTransit会自动将Saga的状态持久化到配置的存储中如Redis, PostgreSQL, MongoDB。这意味着即使服务重启未完成的工作流也能从上次中断的状态恢复。这是构建可靠的长时运行AI工作流的关键。注意事项在设计Saga时要特别注意补偿事务。如果工作流中的某一步失败例如AI调用超时你需要有回滚或补偿机制。例如在“生成报告”失败后你可能需要发布一个CleanupTempDataCommand。这通常通过监听失败事件或设置超时器来实现。7. 部署、监控与性能考量将这样一个基于CQRS和事件驱动的系统部署到生产环境需要额外的考虑。7.1 部署拓扑建议将不同的组件部署为独立的微服务或至少是独立的进程以实现独立伸缩API网关服务处理HTTP请求发送命令和查询。命令处理服务运行MediatR命令处理器和领域逻辑。事件处理服务运行MassTransit消费者处理AI调用和业务逻辑。查询服务专门处理查询请求连接只读数据库副本。进程管理器服务运行Saga状态机实例。这些服务通过消息总线RabbitMQ/Kafka和数据库进行通信。数据库层面需要设置主从复制将写操作指向主库读操作指向从库。7.2 可观测性分布式系统的调试离不开强大的可观测性。日志结构化日志如Serilog Seq/ELK。在每个命令、事件、AI调用的边界记录日志并包含唯一的CorrelationId通常是WorkSessionId以便追踪整个工作流。指标使用Prometheus和Grafana监控关键指标命令/查询的吞吐量与延迟、AI API的调用次数与延迟、消息队列的积压情况、各服务的内存/CPU使用率。分布式追踪使用OpenTelemetry将跨服务的调用链串联起来可视化一个用户请求从API网关到命令处理再到AI调用和事件处理的完整路径。7.3 性能与伸缩性要点命令端的异步非阻塞确保所有I/O操作数据库、AI调用、消息发布都是异步的避免阻塞线程池线程。查询端的缓存策略针对不同数据特点采用多级缓存。会话元信息如状态、进度可以缓存在Redis中并设置较短TTL。历史对话记录可以分页缓存。静态的、通用的AI回答可以缓存更长时间。AI调用优化批处理如果可能将多个小的、独立的Prompt合并成一个批处理请求发送给AI API以减少网络往返和利用Token效率。流式响应对于生成长文本的场景使用OpenAI的流式响应streaming并将内容通过SignalR分块推送给前端提升用户体验。速率限制与退避严格遵守AI服务的速率限制实现带指数退避的智能重试机制。消息总线配置根据事件类型设置不同的队列和交换器。高优先级的事件如用户实时交互使用独立队列确保低延迟。批量处理的事件可以使用工作队列模式。8. 常见问题与排查技巧实录在实际开发和运维中我遇到了不少典型问题这里分享一些排查思路和解决方案。问题现象可能原因排查步骤与解决方案用户发送命令后长时间无响应查询状态一直是“Running”。1. 命令事件未发布。2. 事件处理器消费失败。3. AI服务调用超时或失败。4. 进程管理器状态卡住。1.检查日志查看命令处理器日志确认AnalysisStartedEvent是否成功发布。使用消息队列的管理界面查看对应队列是否有消息堆积。2.追踪事件通过CorrelationId在分布式追踪系统中查看事件流在哪里中断。3.检查AI服务查看AI服务调用的日志和指标是否有429限速或5xx错误。检查Prompt是否构造正确Token是否超限。4.检查Saga状态查询Saga状态存储如数据库中的ReportGenerationSagaState表看当前状态是否与预期一致。查询端返回的数据不是最新的。1. 数据库主从同步延迟。2. 查询缓存未及时失效。1.监控复制延迟监控数据库的复制延迟指标。对于一致性要求极高的查询可以考虑“写后读”模式即命令处理完成后将关键数据写入一个快速缓存如Redis查询端优先读缓存。2.精细化缓存失效在命令处理器中当聚合根状态改变时除了发布事件还应主动使相关缓存失效。例如在WorkSession状态更新后立即删除Redis中该会话的进度缓存键。AI返回的响应格式不符合预期导致后续流程解析失败。Prompt指令不清晰AI输出不稳定。1.强化Prompt工程在Prompt中使用更明确的指令例如“请严格按照以下JSON格式输出...”。使用OpenAI的response_format参数强制JSON输出。2.增加响应验证与重试在IAIService中对AI返回的内容进行强验证。如果解析失败可以尝试用另一个更严格的Prompt让AI修正输出或者记录错误并转入人工处理流程。进程管理器进入无法跳出的状态死锁。Saga状态机设计有缺陷某个预期事件永远无法发生。1.设计时加入超时处理为每个等待状态如AwaitingUserConfirmation设置超时事件。超时后可以发布一个补偿命令并将Saga状态置为“超时失败”通知用户。2.添加管理控制台构建一个内部管理界面可以查看所有运行中的Saga实例及其状态并允许管理员手动干预如强制发布某个事件或重置状态。在高并发下消息队列出现大量积压。事件处理器的处理速度跟不上命令的生成速度尤其是AI调用成为瓶颈。1.横向扩展事件处理器增加事件处理服务消费者的实例数量。2.优化AI调用如前所述考虑批处理、使用更快的模型如gpt-3.5-turbo、或引入请求队列在服务内部进行限流和调度。3.优先级队列将实时性要求不高的事件如日志记录、数据分析路由到低优先级队列确保核心业务事件优先处理。实操心得在开发初期就应投入精力搭建好结构化的日志和分布式追踪。当问题发生时能够通过一个SessionId快速拉取到跨所有服务的相关日志和追踪信息是快速定位问题的关键。另外对于AI集成项目一定要对第三方API的失败有充分的预案设计降级策略例如缓存旧答案、返回友好提示、转入人工队列等。
CQRS架构在ChatGPT集成中的应用:构建可扩展的AI工作流引擎
发布时间:2026/6/17 11:33:22
1. 项目概述当ChatGPT遇上CQRS最近在设计和实现一个需要与大型语言模型LLM深度集成的系统时我遇到了一个典型的架构挑战如何优雅地处理用户与AI之间复杂的、状态化的交互流程比如一个用户向ChatGPT发出一个指令“帮我分析上周的销售数据并生成报告”这背后可能涉及多个步骤——查询数据库、执行计算、生成文本、甚至调用外部API。如果所有逻辑都塞在一个庞大的服务里代码很快就会变得难以维护尤其是当需要区分“用户指令的解析”查询和“报告生成任务的执行”命令时。这正是CQRS命令查询职责分离模式大显身手的地方。这个项目我称之为“ChatGPT Implements Work With Users Using the CQRS Pattern”核心就是探讨如何将CQRS的思想应用于构建与ChatGPT或同类LLM协作的、健壮的后端服务。它不是简单地在ChatGPT外面套一个API网关而是从领域逻辑层面将用户与AI的交互清晰地分解为“命令”触发一个需要改变系统状态或执行复杂流程的动作和“查询”获取信息、请求解释或进行简单对话。通过这种分离我们能够获得更好的性能、更清晰的代码结构以及应对复杂工作流的强大能力。无论你是在构建AI客服、智能助手后台还是任何需要将LLM作为核心“工作者”集成到业务系统中的开发者这套思路都能为你提供一个坚实且可扩展的架构蓝图。2. 核心架构思路为什么CQRS是AI集成的绝配在深入代码之前我们必须先理解为什么CQRS模式特别适合处理与ChatGPT的交互。传统的CRUD架构在面对LLM时常常显得力不从心主要原因在于LLM交互的本质是异步的、有状态的且包含明显的“意图”与“执行”的分离。2.1 传统CRUD的瓶颈与CQRS的优势想象一个简单的场景用户请求“总结我的未读邮件”。在CRUD模型下一个控制器可能同时负责1验证用户身份和权限命令侧2调用邮件API获取数据查询侧3构造Prompt发送给ChatGPT命令侧4等待AI响应并返回查询侧5可能还要将结果缓存或记录日志命令侧。这种混杂的职责使得服务难以测试、扩展并且当AI处理耗时较长时会阻塞整个HTTP请求线程。CQRS通过强制性的职责分离解决了这些问题命令Command负责“做某事”会改变系统状态。它应该是异步的、无返回值的或仅返回一个任务ID。在我们的上下文中所有触发AI执行具体任务、写数据库、调用外部写操作API的请求都是命令。例如“生成销售报告”、“根据对话历史更新用户画像”。查询Query负责“读数据”不会改变系统状态。它应该是同步的、快速返回结果的。例如“获取刚才那个报告生成的进度”、“列出我所有与AI的对话历史”、“向AI提出一个简单的知识性问题”。这种分离带来了几个直接好处模型优化命令模型和查询模型可以针对各自的工作负载进行独立优化。查询模型可以极度简化甚至直接映射到数据库的只读副本或缓存视图以实现毫秒级响应。复杂度管理将复杂的业务逻辑尤其是涉及多步AI调用和状态转换的隔离在命令端保持查询端的简单与稳定。伸缩性命令处理和查询处理可以独立伸缩。如果AI任务繁重可以横向扩展命令处理器如果查询请求量大可以增强查询端的缓存和数据库读副本。2.2 领域驱动设计DDD与CQRS的协同CQRS常常与事件溯源Event Sourcing结合但在这个项目中我们采用更轻量级、更实用的方法将CQRS与DDD的聚合根Aggregate Root和领域事件Domain Event结合。我们将用户与AI的一次“工作任务”Work Session视为一个聚合根。这个聚合根会接收各种命令如StartAnalysisCommandProvideAdditionalInfoCommand并发布相应的领域事件如AnalysisStartedEventAITaskDispatchedEventWorkCompletedEvent。这些领域事件是系统的脊梁。它们不仅用于在聚合内部驱动状态变化更重要的是它们会被发布到消息总线如RabbitMQ, Kafka从而触发后续的进程管理器Process Manager或** Saga**。进程管理器是协调复杂、长期运行工作流的核心模式它监听事件决定下一步该发送什么命令。这正是管理多步AI交互的理想抽象。注意不要一开始就引入事件溯源Event Sourcing。除非你有强烈的审计、时间旅行调试需求否则事件溯源的复杂性可能会超过其收益。对于大多数AI集成场景用领域事件驱动流程并用常规的数据库持久化聚合的“当前状态”是更务实的选择。3. 技术栈选型与核心组件设计基于上述架构思路我们选择了一套能够支撑高并发、异步处理的技术栈。选型的核心原则是解耦、异步、可观测。3.1 后端技术栈详解语言与框架我选择了C# / .NET 8与ASP.NET Core。.NET的强类型系统、优秀的异步编程模型async/await以及对依赖注入的原生支持非常适合构建结构清晰的领域模型。当然Java/Spring Boot Node.js/NestJS Python/FastAPI也都是绝佳选择核心模式是通用的。命令与查询总线使用MediatR库。它是一个轻量级的进程内中介者模式实现能完美地将命令/查询的发送与处理解耦。发送一个ICommand或IQuery由对应的IRequestHandler来处理无需知道具体实现。持久化命令侧使用Entity Framework Core或Dapper将聚合根的状态持久化到关系型数据库如PostgreSQL, SQL Server的“写库”。表结构围绕聚合根设计。查询侧使用Dapper或EF Core的只读上下文连接到一个只读副本数据库。查询模型是面向视图的可能是一张扁平化的表或者是一个专门优化的查询视图。消息总线与事件处理使用MassTransit或Brighter。它们建立在RabbitMQ或Azure Service Bus之上提供了强大的消息发布/订阅、重试、死信队列等功能用于发布领域事件和实现进程管理器。与ChatGPT集成使用OpenAI .NET SDK或Azure OpenAI SDK。关键是要将其封装在领域服务中而不是在处理器里直接调用。这个服务负责构造Prompt、处理Token限制、解析响应并返回结构化的结果。缓存Redis。用于缓存频繁的查询结果如常见的AI问答对、用户会话上下文以及作为进程管理器状态的临时存储。API网关与通信除了标准的RESTful API用于触发命令和简单查询强烈建议为需要实时进度更新的场景如报告生成提供SignalR支持实现服务器向客户端的主动推送。3.2 核心领域模型设计让我们定义一个核心聚合根WorkSession。public class WorkSession : AggregateRootGuid // 假设AggregateRoot是一个基类 { public Guid Id { get; private set; } public string UserId { get; private set; } public WorkSessionStatus Status { get; private set; } // e.g., Draft, Running, WaitingForInput, Completed, Failed public string CurrentObjective { get; private set; } // 当前任务目标 public ListConversationTurn ConversationHistory { get; private set; } // 对话历史 public Dictionarystring, object ContextData { get; private set; } // 上下文数据如已收集的信息 public string? FinalResult { get; private set; } // 最终结果 // 命令处理方法 public void StartAnalysis(StartAnalysisCommand command) { if (Status ! WorkSessionStatus.Draft) throw new InvalidOperationException(Session already started.); CurrentObjective command.Objective; Status WorkSessionStatus.Running; AddConversationTurn(user, command.UserInput); // 发布领域事件 AddDomainEvent(new AnalysisStartedEvent(Id, UserId, CurrentObjective)); AddDomainEvent(new AITaskDispatchedEvent(Id, InitialAnalysis, ConversationHistory)); } public void HandleAIResponse(AIResponseReceivedEvent event) { AddConversationTurn(assistant, event.ResponseContent); // 根据AI响应内容可能更新状态、发布新事件 if (event.SuggestsNextAction need_more_info) { Status WorkSessionStatus.WaitingForInput; AddDomainEvent(new WaitingForUserInputEvent(Id, event.RequiredInfo)); } else if (event.SuggestsNextAction complete) { FinalResult event.ResponseContent; Status WorkSessionStatus.Completed; AddDomainEvent(new WorkCompletedEvent(Id, FinalResult)); } } // ... 其他命令处理方法 }这个WorkSession聚合根是保证一致性的边界。所有改变其状态的操作都必须通过它的方法响应命令来完成。4. 命令端实现驱动AI工作流引擎命令端是整个系统的驱动者。它的职责是接收用户意图通过聚合根验证业务规则然后发布事件触发后续的AI调用和流程控制。4.1 命令处理器Command Handler的实现一个典型的命令处理器例如处理StartAnalysisCommandpublic class StartAnalysisCommandHandler : IRequestHandlerStartAnalysisCommand, Guid { private readonly IRepositoryWorkSession _sessionRepository; private readonly IPublishEndpoint _publishEndpoint; // MassTransit 接口 public async TaskGuid Handle(StartAnalysisCommand request, CancellationToken cancellationToken) { // 1. 创建或获取聚合根 var session WorkSession.StartNew(request.UserId, request.Objective, request.InitialInput); // 2. 持久化聚合根的新状态 await _sessionRepository.SaveAsync(session, cancellationToken); // 3. 发布聚合根产生的所有领域事件 foreach (var domainEvent in session.DomainEvents) { // 将领域事件转换为集成事件可选添加更多上下文 var integrationEvent new IntegrationEventWrapper(domainEvent); await _publishEndpoint.Publish(integrationEvent, cancellationToken); } session.ClearDomainEvents(); // 4. 返回会话ID客户端可以用它来查询进度 return session.Id; } }这里的关键是处理器不直接调用AI。它只负责更新领域状态并发布事件。AI调用是由监听这些事件的领域事件处理器来触发的。4.2 领域事件处理器与AI服务封装接下来我们创建一个处理器来响应AITaskDispatchedEventpublic class AITaskDispatchedEventHandler : IConsumerAITaskDispatchedEvent // MassTransit的消费者接口 { private readonly IAIService _aiService; private readonly IRepositoryWorkSession _sessionRepository; private readonly IPublishEndpoint _publishEndpoint; public async Task Consume(ConsumeContextAITaskDispatchedEvent context) { var event context.Message; var session await _sessionRepository.GetByIdAsync(event.SessionId); // 调用封装的AI服务 var aiResponse await _aiService.ProcessTaskAsync( event.TaskType, session.ConversationHistory, session.ContextData ); // 根据AI响应向聚合根发送一个新的“内部命令”通过发布事件 var responseEvent new AIResponseReceivedEvent( event.SessionId, aiResponse.Content, aiResponse.SuggestedNextAction, aiResponse.RequiredParameters ); await _publishEndpoint.Publish(responseEvent); } }而IAIService是一个领域服务它封装了所有与OpenAI API的交互细节public class OpenAIService : IAIService { private readonly IOpenAIClient _client; private readonly IPromptTemplateEngine _templateEngine; public async TaskAIResponse ProcessTaskAsync(string taskType, ListConversationTurn history, Dictionarystring, object context) { // 1. 根据任务类型选择Prompt模板 var promptTemplate GetTemplate(taskType); // 2. 使用上下文数据渲染Prompt var fullPrompt _templateEngine.Render(promptTemplate, new { History history, Context context }); // 3. 调用API注意处理速率限制、超时和重试 var chatRequest new ChatCompletionRequest { Messages BuildMessagesFromPrompt(fullPrompt), Model gpt-4, Temperature 0.7, MaxTokens 2000 }; var response await _client.GetChatCompletionAsync(chatRequest); // 4. 解析响应可能使用JSON模式如OpenAI的function calling来获取结构化输出 var structuredOutput ParseAIResponse(response.Choices.First().Message.Content); return new AIResponse { Content structuredOutput.Answer, SuggestedNextAction structuredOutput.NextAction, RequiredParameters structuredOutput.NeededInfo }; } }实操心得在Prompt模板中明确指示AI以特定JSON格式返回可以极大简化后续的解析逻辑。利用OpenAI的response_format参数或Function Calling特性能获得更稳定、结构化的输出。5. 查询端实现高效的数据读取与状态展示查询端的设计目标是快和简单。它不关心业务逻辑只关心如何以最合适的形式把数据呈现给客户端。5.1 查询模型与只读存储查询端的数据模型应该完全根据前端或客户端的需要来设计。例如一个WorkSessionProgressView-- 在只读副本数据库中的一个视图或表 CREATE VIEW vw_WorkSessionProgress AS SELECT ws.Id, ws.UserId, ws.Status, ws.CurrentObjective, ws.LastUpdatedAt, -- 计算字段如进度百分比这是一个简化示例实际进度可能更复杂 CASE WHEN ws.Status Completed THEN 100 WHEN ws.Status Running THEN 50 -- 可能从其他表计算 ELSE 0 END as ProgressPercentage, -- 最新的AI回复摘要 (SELECT TOP 1 Content FROM ConversationTurns WHERE SessionId ws.Id AND Role assistant ORDER BY TurnNumber DESC) as LastAIMessage FROM WriteDatabase.WorkSessions ws; -- 假设从写库同步过来这个视图扁平化了WorkSession聚合及其相关的ConversationTurn查询效率极高。5.2 查询处理器与缓存策略对应的查询处理器非常简单public class GetSessionProgressQueryHandler : IRequestHandlerGetSessionProgressQuery, WorkSessionProgressDto { private readonly IQuerySessionRepository _queryRepo; private readonly IDistributedCache _cache; public async TaskWorkSessionProgressDto Handle(GetSessionProgressQuery request, CancellationToken ct) { var cacheKey $session_progress:{request.SessionId}; // 尝试从缓存读取 var cached await _cache.GetStringAsync(cacheKey, ct); if (cached ! null) { return JsonSerializer.DeserializeWorkSessionProgressDto(cached); } // 缓存未命中查询数据库 var progress await _queryRepo.GetProgressAsync(request.SessionId, ct); if (progress ! null) { // 写入缓存设置较短的过期时间因为进度更新频繁 await _cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(progress), new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow TimeSpan.FromSeconds(30) // 30秒后过期 }, ct); } return progress; } }对于对话历史这种可能较长的数据可以采用分页查询并且只缓存最近的N条。5.3 实时进度更新SignalR集成对于长时间运行的AI任务轮询查询进度对用户体验不友好。我们可以使用SignalR在状态发生变化时主动推送。在命令端当WorkSession的状态发生变化并发布WorkSessionUpdatedEvent时一个专门的事件处理器会捕获这个事件并通过SignalR Hub通知连接到该会话的所有客户端。public class WorkSessionUpdatedEventHandler : IConsumerWorkSessionUpdatedEvent { private readonly IHubContextWorkSessionHub _hubContext; public async Task Consume(ConsumeContextWorkSessionUpdatedEvent context) { var event context.Message; // 通知该会话的所有客户端 await _hubContext.Clients.Group(event.SessionId.ToString()) .SendAsync(ProgressUpdated, new { event.SessionId, event.NewStatus, event.Progress }); } }客户端在发起任务后连接到Hub并加入以SessionId命名的组即可实时接收更新。6. 进程管理器编排复杂多步AI工作流这是整个架构中最能体现价值的部分。当用户的任务需要多个AI调用步骤并且中间可能需要用户介入时一个简单的“发布-订阅”事件链会变得难以管理。进程管理器或Saga就是用来协调这种长期运行业务流程的模式。假设我们有一个“数据获取-分析-报告生成”的工作流用户请求分析销售数据。AI需要先查询数据库获取原始数据命令1。拿到数据后AI进行分析命令2。分析结果需要用户确认某个指标。用户确认后AI生成最终报告命令3。6.1 进程管理器的状态机实现我们可以使用状态机来建模这个流程。这里使用MassTransit的Automatonymous库来定义一个ReportGenerationSagapublic class ReportGenerationSagaState : SagaStateMachineInstance { public Guid CorrelationId { get; set; } // 对应 WorkSession Id public string CurrentState { get; set; } public string Objective { get; set; } public string RawData { get; set; } public string AnalysisResult { get; set; } public bool UserConfirmed { get; set; } } public class ReportGenerationSaga : MassTransitStateMachineReportGenerationSagaState { // 定义事件 public EventAnalysisStartedEvent AnalysisStarted { get; private set; } public EventDataFetchedEvent DataFetched { get; private set; } public EventAnalysisCompletedEvent AnalysisCompleted { get; private set; } public EventUserConfirmationReceivedEvent UserConfirmationReceived { get; private set; } public EventReportGeneratedEvent ReportGenerated { get; private set; } // 定义状态 public State AwaitingDataFetch { get; private set; } public State AwaitingAnalysis { get; private set; } public State AwaitingUserConfirmation { get; private set; } public State AwaitingReportGeneration { get; private set; } public State Completed { get; private set; } public ReportGenerationSaga() { InstanceState(x x.CurrentState); // 流程起点分析开始事件 Initially( When(AnalysisStarted) .Then(context { context.Instance.Objective context.Data.Objective; }) .PublishAsync(context context.InitFetchDataCommand(new { SessionId context.Instance.CorrelationId, // 根据Objective构造查询参数... })) .TransitionTo(AwaitingDataFetch) ); // 步骤1数据获取完成 During(AwaitingDataFetch, When(DataFetched) .Then(context context.Instance.RawData context.Data.RawDataJson) .PublishAsync(context context.InitPerformAnalysisCommand(new { SessionId context.Instance.CorrelationId, RawData context.Instance.RawData })) .TransitionTo(AwaitingAnalysis) ); // 步骤2分析完成需要用户确认 During(AwaitingAnalysis, When(AnalysisCompleted) .Then(context { context.Instance.AnalysisResult context.Data.Result; // 这里可以发布一个事件通知前端需要用户确认 }) // 发布一个事件让前端展示确认界面 .PublishAsync(context context.InitRequestUserConfirmationEvent(new { SessionId context.Instance.CorrelationId, Question 是否确认指标X, AnalysisResult context.Instance.AnalysisResult })) .TransitionTo(AwaitingUserConfirmation) ); // 步骤3收到用户确认生成报告 During(AwaitingUserConfirmation, When(UserConfirmationReceived) .Then(context context.Instance.UserConfirmed context.Data.Confirmed) .If(context context.Instance.UserConfirmed, thenBinder thenBinder .PublishAsync(context context.InitGenerateReportCommand(new { SessionId context.Instance.CorrelationId, AnalysisResult context.Instance.AnalysisResult })) .TransitionTo(AwaitingReportGeneration) ) .Else(/* 用户拒绝可以结束或回到上一步 */) ); // 步骤4报告生成完成流程结束 During(AwaitingReportGeneration, When(ReportGenerated) .Then(context { // 最终结果已生成可以更新WorkSession的FinalResult }) .Finalize() ); } }这个状态机清晰地定义了整个工作流的步骤、状态转换和触发条件。进程管理器持有工作流的状态ReportGenerationSagaState并监听相关事件在适当的时候发出新的命令来驱动流程向前。6.2 进程管理器的持久化与容错MassTransit会自动将Saga的状态持久化到配置的存储中如Redis, PostgreSQL, MongoDB。这意味着即使服务重启未完成的工作流也能从上次中断的状态恢复。这是构建可靠的长时运行AI工作流的关键。注意事项在设计Saga时要特别注意补偿事务。如果工作流中的某一步失败例如AI调用超时你需要有回滚或补偿机制。例如在“生成报告”失败后你可能需要发布一个CleanupTempDataCommand。这通常通过监听失败事件或设置超时器来实现。7. 部署、监控与性能考量将这样一个基于CQRS和事件驱动的系统部署到生产环境需要额外的考虑。7.1 部署拓扑建议将不同的组件部署为独立的微服务或至少是独立的进程以实现独立伸缩API网关服务处理HTTP请求发送命令和查询。命令处理服务运行MediatR命令处理器和领域逻辑。事件处理服务运行MassTransit消费者处理AI调用和业务逻辑。查询服务专门处理查询请求连接只读数据库副本。进程管理器服务运行Saga状态机实例。这些服务通过消息总线RabbitMQ/Kafka和数据库进行通信。数据库层面需要设置主从复制将写操作指向主库读操作指向从库。7.2 可观测性分布式系统的调试离不开强大的可观测性。日志结构化日志如Serilog Seq/ELK。在每个命令、事件、AI调用的边界记录日志并包含唯一的CorrelationId通常是WorkSessionId以便追踪整个工作流。指标使用Prometheus和Grafana监控关键指标命令/查询的吞吐量与延迟、AI API的调用次数与延迟、消息队列的积压情况、各服务的内存/CPU使用率。分布式追踪使用OpenTelemetry将跨服务的调用链串联起来可视化一个用户请求从API网关到命令处理再到AI调用和事件处理的完整路径。7.3 性能与伸缩性要点命令端的异步非阻塞确保所有I/O操作数据库、AI调用、消息发布都是异步的避免阻塞线程池线程。查询端的缓存策略针对不同数据特点采用多级缓存。会话元信息如状态、进度可以缓存在Redis中并设置较短TTL。历史对话记录可以分页缓存。静态的、通用的AI回答可以缓存更长时间。AI调用优化批处理如果可能将多个小的、独立的Prompt合并成一个批处理请求发送给AI API以减少网络往返和利用Token效率。流式响应对于生成长文本的场景使用OpenAI的流式响应streaming并将内容通过SignalR分块推送给前端提升用户体验。速率限制与退避严格遵守AI服务的速率限制实现带指数退避的智能重试机制。消息总线配置根据事件类型设置不同的队列和交换器。高优先级的事件如用户实时交互使用独立队列确保低延迟。批量处理的事件可以使用工作队列模式。8. 常见问题与排查技巧实录在实际开发和运维中我遇到了不少典型问题这里分享一些排查思路和解决方案。问题现象可能原因排查步骤与解决方案用户发送命令后长时间无响应查询状态一直是“Running”。1. 命令事件未发布。2. 事件处理器消费失败。3. AI服务调用超时或失败。4. 进程管理器状态卡住。1.检查日志查看命令处理器日志确认AnalysisStartedEvent是否成功发布。使用消息队列的管理界面查看对应队列是否有消息堆积。2.追踪事件通过CorrelationId在分布式追踪系统中查看事件流在哪里中断。3.检查AI服务查看AI服务调用的日志和指标是否有429限速或5xx错误。检查Prompt是否构造正确Token是否超限。4.检查Saga状态查询Saga状态存储如数据库中的ReportGenerationSagaState表看当前状态是否与预期一致。查询端返回的数据不是最新的。1. 数据库主从同步延迟。2. 查询缓存未及时失效。1.监控复制延迟监控数据库的复制延迟指标。对于一致性要求极高的查询可以考虑“写后读”模式即命令处理完成后将关键数据写入一个快速缓存如Redis查询端优先读缓存。2.精细化缓存失效在命令处理器中当聚合根状态改变时除了发布事件还应主动使相关缓存失效。例如在WorkSession状态更新后立即删除Redis中该会话的进度缓存键。AI返回的响应格式不符合预期导致后续流程解析失败。Prompt指令不清晰AI输出不稳定。1.强化Prompt工程在Prompt中使用更明确的指令例如“请严格按照以下JSON格式输出...”。使用OpenAI的response_format参数强制JSON输出。2.增加响应验证与重试在IAIService中对AI返回的内容进行强验证。如果解析失败可以尝试用另一个更严格的Prompt让AI修正输出或者记录错误并转入人工处理流程。进程管理器进入无法跳出的状态死锁。Saga状态机设计有缺陷某个预期事件永远无法发生。1.设计时加入超时处理为每个等待状态如AwaitingUserConfirmation设置超时事件。超时后可以发布一个补偿命令并将Saga状态置为“超时失败”通知用户。2.添加管理控制台构建一个内部管理界面可以查看所有运行中的Saga实例及其状态并允许管理员手动干预如强制发布某个事件或重置状态。在高并发下消息队列出现大量积压。事件处理器的处理速度跟不上命令的生成速度尤其是AI调用成为瓶颈。1.横向扩展事件处理器增加事件处理服务消费者的实例数量。2.优化AI调用如前所述考虑批处理、使用更快的模型如gpt-3.5-turbo、或引入请求队列在服务内部进行限流和调度。3.优先级队列将实时性要求不高的事件如日志记录、数据分析路由到低优先级队列确保核心业务事件优先处理。实操心得在开发初期就应投入精力搭建好结构化的日志和分布式追踪。当问题发生时能够通过一个SessionId快速拉取到跨所有服务的相关日志和追踪信息是快速定位问题的关键。另外对于AI集成项目一定要对第三方API的失败有充分的预案设计降级策略例如缓存旧答案、返回友好提示、转入人工队列等。