前言在传统 C# 项目里我们经常会遇到一种场景业务系统 A 发生了一件事业务系统 B 也要知道呼叫中心来了一通电话客服客户端要马上弹屏订单状态变更后库存、短信、日志、报表都要跟着处理某个服务临时挂了消息不能丢等它恢复后还要继续处理。如果用最简单的方式很多人第一反应是客户端定时轮询数据库 或者 服务 A 直接调用服务 B 的接口这种方式在小项目里能跑但是一旦业务复杂起来就会出现很多问题1. 服务之间强耦合 2. 接口调用失败后不好补偿 3. 客户端频繁轮询浪费资源 4. 高并发时数据库压力很大 5. 某个服务挂了消息可能直接丢失 6. 后期扩展新业务时要不断改老代码这时候就需要引入一个非常重要的中间件消息队列。本文以阿里云 RocketMQ C#为例讲清楚1. RocketMQ 是什么 2. 为什么要用 RocketMQ 3. 阿里云 RocketMQ 怎么申请和配置 4. C# 如何发送消息 5. C# 如何消费消息 6. 在真实业务中应该怎么设计 7. 消息幂等、重试、死信这些底层知识怎么理解一、RocketMQ 是什么RocketMQ 是一个分布式消息中间件。更通俗一点说它就是系统之间的“消息中转站”。比如原来系统之间是这样调用的系统 A --- 系统 B如果系统 B 挂了系统 A 调用失败业务就可能中断。用了 RocketMQ 后系统变成这样系统 A --- RocketMQ --- 系统 B系统 A 不需要关心系统 B 当前是否在线只需要把消息投递给 RocketMQ。系统 B 恢复之后再从 RocketMQ 里把消息消费出来。这就是消息队列最核心的价值解耦、削峰、异步、可靠投递。二、RocketMQ 在业务系统中的典型场景1. 呼叫中心来电弹屏比如阿里云云联络中心有一通来电用户拨打 400 电话 ↓ 阿里云云联络中心收到来电 ↓ 事件推送到 RocketMQ ↓ 我们的 C# 后台服务消费消息 ↓ 通知 WinForms 客户端弹屏客户端不用一直去查数据库也不用一直问服务端“有没有新来电”。只要来电事件进入 RocketMQ我们服务端消费到后再精准通知对应坐席即可。2. 订单系统异步处理用户下单 ↓ 订单服务写入订单 ↓ 发送 OrderCreated 消息到 RocketMQ ↓ 库存服务扣库存 ↓ 短信服务发短信 ↓ 积分服务加积分 ↓ 报表服务统计数据订单服务不需要一个一个调用库存、短信、积分、报表接口。它只负责发一条消息后面的服务各自消费。3. 日志和审计业务系统产生操作日志 ↓ 发送到 RocketMQ ↓ 日志服务异步入库这样可以避免日志写入拖慢主业务接口。三、RocketMQ 的核心概念在使用 RocketMQ 之前必须先理解几个核心概念。1. Producer生产者生产者就是发送消息的一方。比如订单服务 呼叫中心服务 用户中心服务 支付服务它们都可以作为 Producer。代码层面就是SendMessageAsync(...)2. Consumer消费者消费者就是接收并处理消息的一方。比如短信服务 库存服务 弹屏服务 日志服务 报表服务它们从 RocketMQ 中拿到消息然后执行自己的业务逻辑。3. Topic主题Topic 是消息的分类。可以理解成数据库里的“表”或者业务上的“消息通道”。比如CallCenterEventTopic // 呼叫中心事件 OrderEventTopic // 订单事件 LogEventTopic // 日志事件发送消息时要指定 Topic。消费消息时也要订阅 Topic。4. ConsumerGroup消费者分组ConsumerGroup 是消费者分组。同一个 ConsumerGroup 里面的多个消费者会共同消费同一批消息。比如有 3 台服务都属于同一个分组CallCenterEventConsumerGroup ├── 服务实例 1 ├── 服务实例 2 └── 服务实例 3RocketMQ 会把消息分摊给这 3 个实例处理这就是负载均衡。如果不同的业务都要收到同一条消息就应该使用不同的 ConsumerGroup。比如CallCenterPopupGroup // 负责弹屏 CallCenterRecordGroup // 负责通话记录 CallCenterStatisticsGroup // 负责统计这样同一条来电消息可以被多个业务系统分别消费。5. Message消息Message 就是真正传输的数据。一条消息通常包含Topic MessageId Body Tag Key PropertiesBody 一般是 JSON。比如呼叫中心来电事件{ eventId: EVT202605290001, eventType: IncomingCall, callId: CALL202605290001, caller: 13800138000, callee: 4008888888, agentId: 1001, eventTime: 2026-05-29 10:00:00 }四、阿里云 RocketMQ 怎么申请下面以阿里云控制台为例。第一步开通云消息队列 RocketMQ 版进入阿里云控制台搜索云消息队列 RocketMQ 版然后开通服务。第二步创建实例进入 RocketMQ 控制台后创建实例。通常需要关注1. 地域 2. 实例类型 3. 网络类型 4. 是否需要公网访问 5. 规格和计费模式如果你的 C# 服务部署在阿里云 ECS 上建议优先使用 VPC 内网访问。如果你的 C# 服务部署在本地服务器或者其他云服务器上就需要使用公网接入点。第三步获取 EndpointEndpoint 就是 RocketMQ 的接入地址。示例格式类似rmq-cn-xxxx.cn-hangzhou.rmq.aliyuncs.com:8080注意区分公网接入点 VPC 内网接入点本地开发调试一般用公网接入点。正式生产环境如果服务部署在阿里云 ECS建议用 VPC 接入点。第四步创建 TopicTopic 是消息主题。比如我们创建一个呼叫中心事件主题CallCenterEventTopic也可以创建订单主题OrderEventTopic命名建议业务域 Event Topic例如CallCenterEventTopic OrderEventTopic UserEventTopic第五步创建 ConsumerGroup创建消费者分组。比如GID_CallCenterEventConsumer命名建议GID_业务名称_消费用途例如GID_CallCenter_Popup GID_CallCenter_Record GID_Order_Stock GID_Order_SMS第六步获取 AccessKey 和 SecretKeyC# 程序连接阿里云 RocketMQ 时需要认证信息。通常需要Endpoint InstanceId Topic ConsumerGroup AccessKey SecretKey这些信息不要写死在代码里建议放到appsettings.json 环境变量 配置中心生产环境不要把 AccessKey 和 SecretKey 放到前端客户端更不要提交到 Git 仓库。五、为什么不建议 WinForms 客户端直接连接 RocketMQ很多 C# 开发者第一反应可能是我 WinForms 客户端直接连 RocketMQ 不就行了吗不建议这样做。原因有几个1. AccessKey 和 SecretKey 会暴露在客户端 2. 客户端数量多连接和权限不好管理 3. WinForms 客户端断网、关机、异常退出都很常见 4. 消息消费应该由服务端保证可靠性 5. 客户端更适合接收服务端推送而不是直接消费 MQ正确架构应该是阿里云 RocketMQ ↓ C# 后台服务消费消息 ↓ 写数据库 ↓ 通过 WebSocket / SignalR / TCP 通知客户端 ↓ WinForms 弹屏也就是说RocketMQ 是服务端组件不是客户端组件。六、C# 项目准备建议使用.NET 6 或 .NET 8 Worker Service不建议使用.NET Framework 2.0 .NET Framework 4.0 .NET Framework 4.5 老项目直接硬接老项目可以通过 HTTP API、WebSocket、TCP 等方式和新的 MQ 服务通信。推荐架构老 WinForms 客户端 ↓ 你自己的业务服务 ↓ 新的 .NET 8 RocketMQ 消费服务 ↓ 阿里云 RocketMQ七、创建 .NET Worker Service创建项目dotnet new worker -n RocketMqDemo cd RocketMqDemo安装 RocketMQ SDKdotnet add package RocketMQ.Client如果项目中使用配置文件可以安装dotnet add package Microsoft.Extensions.Configuration dotnet add package Microsoft.Extensions.Configuration.Json dotnet add package Microsoft.Extensions.Hosting八、配置 appsettings.json新增配置文件{ RocketMq: { Endpoint: rmq-cn-xxxx.cn-hangzhou.rmq.aliyuncs.com:8080, InstanceId: rmq-cn-xxxx, Topic: CallCenterEventTopic, ConsumerGroup: GID_CallCenterEventConsumer, AccessKey: 你的AccessKey, SecretKey: 你的SecretKey } }生产环境建议使用环境变量覆盖敏感配置。例如RocketMq__AccessKey RocketMq__SecretKey九、定义配置类public class RocketMqOptions { public string Endpoint { get; set; } public string InstanceId { get; set; } public string Topic { get; set; } public string ConsumerGroup { get; set; } public string AccessKey { get; set; } public string SecretKey { get; set; } }十、定义业务事件模型以呼叫中心来电事件为例public class CallCenterEvent { public string EventId { get; set; } public string EventType { get; set; } public string CallId { get; set; } public string Caller { get; set; } public string Callee { get; set; } public string AgentId { get; set; } public DateTime EventTime { get; set; } }字段解释EventId事件唯一 ID用于幂等控制 EventType事件类型比如 IncomingCall、CallAnswered、CallHangup CallId通话唯一 ID Caller主叫号码 Callee被叫号码 AgentId坐席编号 EventTime事件发生时间十一、C# 发送消息示例下面是生产者代码。实际项目中生产者可以是你的业务系统也可以是测试工具。using System.Text; using System.Text.Json; using RocketMQ.Client; using RocketMQ.Client.Core; using RocketMQ.Client.Producer; using RocketMQ.Client.Message; public class RocketMqProducerService { private readonly RocketMqOptions _options; public RocketMqProducerService(RocketMqOptions options) { _options options; } public async Task SendCallCenterEventAsync(CallCenterEvent callEvent) { var credentialsProvider new StaticSessionCredentialsProvider( _options.AccessKey, _options.SecretKey ); var clientConfig new ClientConfig.Builder() .SetEndpoints(_options.Endpoint) .SetNamespace(_options.InstanceId) .SetCredentialsProvider(credentialsProvider) .Build(); var producer await new Producer.Builder() .SetClientConfig(clientConfig) .SetTopics(_options.Topic) .Build(); string json JsonSerializer.Serialize(callEvent); var message new Message.Builder() .SetTopic(_options.Topic) .SetBody(Encoding.UTF8.GetBytes(json)) .SetKeys(callEvent.EventId) .SetTag(callEvent.EventType) .Build(); var sendReceipt await producer.SendAsync(message); Console.WriteLine($消息发送成功MessageId{sendReceipt.MessageId}); } }代码解释StaticSessionCredentialsProvider用于配置 AccessKey 和 SecretKey。SetEndpoints(...)用于设置 RocketMQ 接入点。SetNamespace(...)用于设置实例 ID。Serverless 版公网访问时通常需要设置 InstanceId。SetTopics(...)用于指定生产者要发送到哪些 Topic。SetKeys(...)用于设置消息 Key建议放业务唯一标识比如 EventId、OrderId、CallId。SetTag(...)用于设置消息标签比如 IncomingCall、CallAnswered、CallHangup。后续消费者可以根据 Tag 过滤不同类型的消息。十二、C# 消费消息示例消费者负责从 RocketMQ 里读取消息并处理业务。using System.Text; using System.Text.Json; using RocketMQ.Client; using RocketMQ.Client.Core; using RocketMQ.Client.Consumer; public class RocketMqConsumerWorker : BackgroundService { private readonly RocketMqOptions _options; public RocketMqConsumerWorker(RocketMqOptions options) { _options options; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var credentialsProvider new StaticSessionCredentialsProvider( _options.AccessKey, _options.SecretKey ); var clientConfig new ClientConfig.Builder() .SetEndpoints(_options.Endpoint) .SetNamespace(_options.InstanceId) .SetCredentialsProvider(credentialsProvider) .Build(); var consumer await new SimpleConsumer.Builder() .SetClientConfig(clientConfig) .SetConsumerGroup(_options.ConsumerGroup) .SetSubscriptionExpressions(new Dictionarystring, FilterExpression { { _options.Topic, new FilterExpression(*) } }) .Build(); Console.WriteLine(RocketMQ 消费者已启动...); while (!stoppingToken.IsCancellationRequested) { try { var messages await consumer.ReceiveAsync( maxMessageNum: 16, invisibleDuration: TimeSpan.FromSeconds(30) ); foreach (var message in messages) { try { string json Encoding.UTF8.GetString(message.Body.ToArray()); Console.WriteLine(收到消息 json); var callEvent JsonSerializer.DeserializeCallCenterEvent(json); if (callEvent null) { await consumer.AckAsync(message); continue; } await HandleCallCenterEventAsync(callEvent); await consumer.AckAsync(message); } catch (Exception ex) { Console.WriteLine(处理消息失败 ex.Message); // 不 AckRocketMQ 后续会重新投递 } } } catch (Exception ex) { Console.WriteLine(拉取消息异常 ex.Message); await Task.Delay(3000, stoppingToken); } } } private async Task HandleCallCenterEventAsync(CallCenterEvent callEvent) { switch (callEvent.EventType) { case IncomingCall: Console.WriteLine($来电{callEvent.Caller}坐席{callEvent.AgentId}); break; case CallAnswered: Console.WriteLine($通话已接听CallId{callEvent.CallId}); break; case CallHangup: Console.WriteLine($通话已挂断CallId{callEvent.CallId}); break; case RecordingReady: Console.WriteLine($录音已生成CallId{callEvent.CallId}); break; default: Console.WriteLine($未知事件类型{callEvent.EventType}); break; } await Task.CompletedTask; } }十三、消费者代码重点解释1. ReceiveAsyncvar messages await consumer.ReceiveAsync(16, TimeSpan.FromSeconds(30));表示一次最多拉取 16 条消息。第二个参数表示消息的不可见时间。简单理解消费者拿到消息后这条消息会暂时对其他消费者不可见。 如果消费者处理成功并 Ack消息就算消费完成。 如果消费者没有 Ack超过不可见时间后消息会重新投递。2. AckAsyncawait consumer.AckAsync(message);Ack 表示确认消费成功。只有业务真正处理完成后才应该 Ack。不要一收到消息就 Ack否则业务处理失败时消息就丢了。正确顺序是1. 收到消息 2. 解析消息 3. 校验幂等 4. 执行业务逻辑 5. 写数据库 6. 通知客户端 7. Ack3. 处理失败不要 Ackcatch (Exception ex) { // 不 Ack等待 RocketMQ 重投 }这就是 RocketMQ 的可靠性来源之一。如果你的服务处理消息时崩溃了只要没有 Ack消息后续还能重新投递。十四、非常重要消息幂等设计RocketMQ 这类消息队列通常要按照“至少一次投递”来设计。意思是一条消息至少会投递一次但在异常情况下可能会投递多次。为什么会重复1. 消费者处理成功了但 Ack 失败 2. 消费者处理过程中网络断开 3. 消费者处理时间超过不可见时间 4. 服务重启导致消息重新投递所以业务系统必须做幂等。1. 建议数据库表CREATE TABLE MQ_Message_Log ( Id BIGINT IDENTITY(1,1) PRIMARY KEY, MessageKey NVARCHAR(100) NOT NULL, Topic NVARCHAR(100) NOT NULL, EventType NVARCHAR(50) NULL, BusinessId NVARCHAR(100) NULL, Body NVARCHAR(MAX) NULL, ConsumeStatus INT NOT NULL, CreateTime DATETIME NOT NULL DEFAULT GETDATE(), UpdateTime DATETIME NULL ); CREATE UNIQUE INDEX UX_MQ_Message_Log_MessageKey ON MQ_Message_Log(MessageKey);2. 消费前判断是否处理过private async Taskbool HasProcessedAsync(string eventId) { // 这里演示伪代码 // 实际项目中可以用 Dapper、EF Core、ADO.NET 查询数据库 string sql SELECT COUNT(1) FROM MQ_Message_Log WHERE MessageKey EventId AND ConsumeStatus 1; return await Task.FromResult(false); }3. 幂等消费流程private async Task HandleCallCenterEventAsync(CallCenterEvent callEvent) { bool processed await HasProcessedAsync(callEvent.EventId); if (processed) { Console.WriteLine($消息已处理过EventId{callEvent.EventId}); return; } switch (callEvent.EventType) { case IncomingCall: await HandleIncomingCallAsync(callEvent); break; case CallHangup: await HandleCallHangupAsync(callEvent); break; } await SaveMessageLogAsync(callEvent); }幂等设计的核心原则同一条业务消息重复消费多次最终结果应该和消费一次一样。比如同一个来电事件不能弹屏 3 次。同一个订单不能扣 3 次库存。同一个短信不能发 3 遍。十五、RocketMQ 的底层工作流程从整体链路看RocketMQ 的工作流程大概是Producer 创建消息 ↓ Producer 发送消息到 Broker ↓ Broker 持久化消息 ↓ Consumer 从 Broker 拉取消息 ↓ Consumer 执行业务逻辑 ↓ Consumer Ack 确认消费成功 ↓ Broker 标记消费进度这里面有几个核心组件1. BrokerBroker 是真正存储消息和投递消息的服务。Producer 发送消息本质上是把消息发送给 Broker。Consumer 消费消息本质上是从 Broker 获取消息。2. Topic 和 Queue一个 Topic 底层会有多个 Queue。可以理解为Topic 是逻辑分类 Queue 是实际存储和并发消费的分片多个 Queue 可以提高并发能力。3. 消费位点RocketMQ 会记录每个 ConsumerGroup 消费到哪里了。比如GID_CallCenter_Popup 消费到第 100 条 GID_CallCenter_Record 消费到第 80 条不同 ConsumerGroup 之间的消费进度互不影响。这也是为什么同一条消息可以被多个业务系统各自消费。4. 重试机制如果消费者处理失败没有 AckRocketMQ 会重新投递。这就是消费重试。但重试不是无限的。超过最大重试次数后消息可能进入死信队列。5. 死信队列死信队列用于存放一直消费失败的消息。比如消息格式错误 业务数据不存在 数据库字段超长 代码一直抛异常这些消息一直重试也没有意义就会进入死信队列。生产环境一定要监控死信队列。十六、真实项目推荐架构以呼叫中心为例推荐架构如下阿里云云联络中心 ↓ RocketMQ TopicCallCenterEventTopic ↓ .NET 8 Worker Service 消费消息 ↓ 写入 SQL Server ↓ 通过 WebSocket / SignalR / TCP 通知 WinForms 客户端 ↓ 客服客户端来电弹屏不建议这样WinForms 客户端直接连接 RocketMQ推荐这样WinForms 客户端 ↓ 你自己的业务服务 ↓ RocketMQ 消费服务 ↓ 阿里云 RocketMQ这样做的好处1. 客户端不暴露密钥 2. 消息消费集中管理 3. 可以统一做日志 4. 可以统一做幂等 5. 可以统一做异常重试 6. 可以兼容老系统 7. 后期可以扩展更多云厂商十七、C# 老项目怎么兼容很多公司还在使用.NET Framework 4.5 .NET Framework 4.0 甚至 .NET Framework 2.0这种情况下不建议在老项目里硬接 RocketMQ。更推荐新增一个现代化服务RocketMqEventService技术栈.NET 6 / .NET 8 Worker Service RocketMQ.Client SQL Server WebSocket / SignalR / HTTP API然后老项目通过 HTTP 或 TCP 调用这个新服务。架构如下老 WinForms 客户端 ↓ 老业务服务 ↓ RocketMqEventService ↓ 阿里云 RocketMQ这是一种比较稳妥的“旁路改造”方案。它不会大规模破坏老系统也能逐步引入现代化架构。十八、生产环境注意事项1. 不要把密钥写死错误做法string accessKey xxxxx; string secretKey xxxxx;推荐环境变量 配置中心 KMS 容器密钥2. 消费逻辑一定要幂等消息重复投递不是 RocketMQ 的 bug而是分布式系统的常态。只要用了消息队列就必须考虑幂等。3. 不要一收到消息就 Ack错误做法收到消息 ↓ 马上 Ack ↓ 再处理业务如果后面的业务失败了消息已经被确认无法自动重试。正确做法收到消息 ↓ 处理业务 ↓ 写数据库 ↓ 成功后 Ack4. 消费失败要记录日志消费失败时至少记录MessageId MessageKey Topic Tag Body 异常堆栈 失败时间 重试次数这样后期排查问题才有依据。5. 要监控消息堆积消息堆积说明消费者处理不过来了。常见原因1. 消费者服务挂了 2. 消费者处理速度太慢 3. 数据库性能瓶颈 4. 下游接口响应慢 5. 消费线程数量不足6. 合理拆分 Topic 和 ConsumerGroup不要所有业务都丢到一个 Topic。可以按业务域拆CallCenterEventTopic OrderEventTopic UserEventTopic LogEventTopic也不要多个完全不同的业务混用一个 ConsumerGroup。十九、RocketMQ 和数据库轮询对比对比项数据库轮询RocketMQ实时性较差较好系统耦合高低数据库压力大小失败重试需要自己做MQ 支持消息堆积能力弱强扩展性一般强适合场景小系统、低频任务中大型系统、事件驱动二十、总结RocketMQ 不是简单的“发消息工具”它更像是系统架构里的事件总线。在 C# 项目中引入 RocketMQ可以解决很多传统架构里的问题1. 服务之间解耦 2. 削峰填谷 3. 异步处理 4. 失败重试 5. 消息可靠投递 6. 业务事件统一分发对于传统 C# WinForms、.NET Framework 老项目来说不建议直接在客户端或老服务端里硬接 RocketMQ。更合理的方式是新增一个现代化的 .NET 6 / .NET 8 后台服务专门负责 RocketMQ 消息消费、业务落库、幂等控制、事件分发。最终架构应该是业务事件 ↓ RocketMQ ↓ C# 后台服务 ↓ 数据库 / WebSocket / HTTP API ↓ 业务客户端这就是从“接口调用型系统”向“事件驱动型系统”的升级。对于做企业软件、呼叫中心、订单系统、工单系统、即时通知系统的 C# 开发者来说RocketMQ 是一个非常值得掌握的中间件。
C# 使用阿里云 RocketMQ 接入实战,从申请到代码一次讲透
发布时间:2026/5/30 12:22:13
前言在传统 C# 项目里我们经常会遇到一种场景业务系统 A 发生了一件事业务系统 B 也要知道呼叫中心来了一通电话客服客户端要马上弹屏订单状态变更后库存、短信、日志、报表都要跟着处理某个服务临时挂了消息不能丢等它恢复后还要继续处理。如果用最简单的方式很多人第一反应是客户端定时轮询数据库 或者 服务 A 直接调用服务 B 的接口这种方式在小项目里能跑但是一旦业务复杂起来就会出现很多问题1. 服务之间强耦合 2. 接口调用失败后不好补偿 3. 客户端频繁轮询浪费资源 4. 高并发时数据库压力很大 5. 某个服务挂了消息可能直接丢失 6. 后期扩展新业务时要不断改老代码这时候就需要引入一个非常重要的中间件消息队列。本文以阿里云 RocketMQ C#为例讲清楚1. RocketMQ 是什么 2. 为什么要用 RocketMQ 3. 阿里云 RocketMQ 怎么申请和配置 4. C# 如何发送消息 5. C# 如何消费消息 6. 在真实业务中应该怎么设计 7. 消息幂等、重试、死信这些底层知识怎么理解一、RocketMQ 是什么RocketMQ 是一个分布式消息中间件。更通俗一点说它就是系统之间的“消息中转站”。比如原来系统之间是这样调用的系统 A --- 系统 B如果系统 B 挂了系统 A 调用失败业务就可能中断。用了 RocketMQ 后系统变成这样系统 A --- RocketMQ --- 系统 B系统 A 不需要关心系统 B 当前是否在线只需要把消息投递给 RocketMQ。系统 B 恢复之后再从 RocketMQ 里把消息消费出来。这就是消息队列最核心的价值解耦、削峰、异步、可靠投递。二、RocketMQ 在业务系统中的典型场景1. 呼叫中心来电弹屏比如阿里云云联络中心有一通来电用户拨打 400 电话 ↓ 阿里云云联络中心收到来电 ↓ 事件推送到 RocketMQ ↓ 我们的 C# 后台服务消费消息 ↓ 通知 WinForms 客户端弹屏客户端不用一直去查数据库也不用一直问服务端“有没有新来电”。只要来电事件进入 RocketMQ我们服务端消费到后再精准通知对应坐席即可。2. 订单系统异步处理用户下单 ↓ 订单服务写入订单 ↓ 发送 OrderCreated 消息到 RocketMQ ↓ 库存服务扣库存 ↓ 短信服务发短信 ↓ 积分服务加积分 ↓ 报表服务统计数据订单服务不需要一个一个调用库存、短信、积分、报表接口。它只负责发一条消息后面的服务各自消费。3. 日志和审计业务系统产生操作日志 ↓ 发送到 RocketMQ ↓ 日志服务异步入库这样可以避免日志写入拖慢主业务接口。三、RocketMQ 的核心概念在使用 RocketMQ 之前必须先理解几个核心概念。1. Producer生产者生产者就是发送消息的一方。比如订单服务 呼叫中心服务 用户中心服务 支付服务它们都可以作为 Producer。代码层面就是SendMessageAsync(...)2. Consumer消费者消费者就是接收并处理消息的一方。比如短信服务 库存服务 弹屏服务 日志服务 报表服务它们从 RocketMQ 中拿到消息然后执行自己的业务逻辑。3. Topic主题Topic 是消息的分类。可以理解成数据库里的“表”或者业务上的“消息通道”。比如CallCenterEventTopic // 呼叫中心事件 OrderEventTopic // 订单事件 LogEventTopic // 日志事件发送消息时要指定 Topic。消费消息时也要订阅 Topic。4. ConsumerGroup消费者分组ConsumerGroup 是消费者分组。同一个 ConsumerGroup 里面的多个消费者会共同消费同一批消息。比如有 3 台服务都属于同一个分组CallCenterEventConsumerGroup ├── 服务实例 1 ├── 服务实例 2 └── 服务实例 3RocketMQ 会把消息分摊给这 3 个实例处理这就是负载均衡。如果不同的业务都要收到同一条消息就应该使用不同的 ConsumerGroup。比如CallCenterPopupGroup // 负责弹屏 CallCenterRecordGroup // 负责通话记录 CallCenterStatisticsGroup // 负责统计这样同一条来电消息可以被多个业务系统分别消费。5. Message消息Message 就是真正传输的数据。一条消息通常包含Topic MessageId Body Tag Key PropertiesBody 一般是 JSON。比如呼叫中心来电事件{ eventId: EVT202605290001, eventType: IncomingCall, callId: CALL202605290001, caller: 13800138000, callee: 4008888888, agentId: 1001, eventTime: 2026-05-29 10:00:00 }四、阿里云 RocketMQ 怎么申请下面以阿里云控制台为例。第一步开通云消息队列 RocketMQ 版进入阿里云控制台搜索云消息队列 RocketMQ 版然后开通服务。第二步创建实例进入 RocketMQ 控制台后创建实例。通常需要关注1. 地域 2. 实例类型 3. 网络类型 4. 是否需要公网访问 5. 规格和计费模式如果你的 C# 服务部署在阿里云 ECS 上建议优先使用 VPC 内网访问。如果你的 C# 服务部署在本地服务器或者其他云服务器上就需要使用公网接入点。第三步获取 EndpointEndpoint 就是 RocketMQ 的接入地址。示例格式类似rmq-cn-xxxx.cn-hangzhou.rmq.aliyuncs.com:8080注意区分公网接入点 VPC 内网接入点本地开发调试一般用公网接入点。正式生产环境如果服务部署在阿里云 ECS建议用 VPC 接入点。第四步创建 TopicTopic 是消息主题。比如我们创建一个呼叫中心事件主题CallCenterEventTopic也可以创建订单主题OrderEventTopic命名建议业务域 Event Topic例如CallCenterEventTopic OrderEventTopic UserEventTopic第五步创建 ConsumerGroup创建消费者分组。比如GID_CallCenterEventConsumer命名建议GID_业务名称_消费用途例如GID_CallCenter_Popup GID_CallCenter_Record GID_Order_Stock GID_Order_SMS第六步获取 AccessKey 和 SecretKeyC# 程序连接阿里云 RocketMQ 时需要认证信息。通常需要Endpoint InstanceId Topic ConsumerGroup AccessKey SecretKey这些信息不要写死在代码里建议放到appsettings.json 环境变量 配置中心生产环境不要把 AccessKey 和 SecretKey 放到前端客户端更不要提交到 Git 仓库。五、为什么不建议 WinForms 客户端直接连接 RocketMQ很多 C# 开发者第一反应可能是我 WinForms 客户端直接连 RocketMQ 不就行了吗不建议这样做。原因有几个1. AccessKey 和 SecretKey 会暴露在客户端 2. 客户端数量多连接和权限不好管理 3. WinForms 客户端断网、关机、异常退出都很常见 4. 消息消费应该由服务端保证可靠性 5. 客户端更适合接收服务端推送而不是直接消费 MQ正确架构应该是阿里云 RocketMQ ↓ C# 后台服务消费消息 ↓ 写数据库 ↓ 通过 WebSocket / SignalR / TCP 通知客户端 ↓ WinForms 弹屏也就是说RocketMQ 是服务端组件不是客户端组件。六、C# 项目准备建议使用.NET 6 或 .NET 8 Worker Service不建议使用.NET Framework 2.0 .NET Framework 4.0 .NET Framework 4.5 老项目直接硬接老项目可以通过 HTTP API、WebSocket、TCP 等方式和新的 MQ 服务通信。推荐架构老 WinForms 客户端 ↓ 你自己的业务服务 ↓ 新的 .NET 8 RocketMQ 消费服务 ↓ 阿里云 RocketMQ七、创建 .NET Worker Service创建项目dotnet new worker -n RocketMqDemo cd RocketMqDemo安装 RocketMQ SDKdotnet add package RocketMQ.Client如果项目中使用配置文件可以安装dotnet add package Microsoft.Extensions.Configuration dotnet add package Microsoft.Extensions.Configuration.Json dotnet add package Microsoft.Extensions.Hosting八、配置 appsettings.json新增配置文件{ RocketMq: { Endpoint: rmq-cn-xxxx.cn-hangzhou.rmq.aliyuncs.com:8080, InstanceId: rmq-cn-xxxx, Topic: CallCenterEventTopic, ConsumerGroup: GID_CallCenterEventConsumer, AccessKey: 你的AccessKey, SecretKey: 你的SecretKey } }生产环境建议使用环境变量覆盖敏感配置。例如RocketMq__AccessKey RocketMq__SecretKey九、定义配置类public class RocketMqOptions { public string Endpoint { get; set; } public string InstanceId { get; set; } public string Topic { get; set; } public string ConsumerGroup { get; set; } public string AccessKey { get; set; } public string SecretKey { get; set; } }十、定义业务事件模型以呼叫中心来电事件为例public class CallCenterEvent { public string EventId { get; set; } public string EventType { get; set; } public string CallId { get; set; } public string Caller { get; set; } public string Callee { get; set; } public string AgentId { get; set; } public DateTime EventTime { get; set; } }字段解释EventId事件唯一 ID用于幂等控制 EventType事件类型比如 IncomingCall、CallAnswered、CallHangup CallId通话唯一 ID Caller主叫号码 Callee被叫号码 AgentId坐席编号 EventTime事件发生时间十一、C# 发送消息示例下面是生产者代码。实际项目中生产者可以是你的业务系统也可以是测试工具。using System.Text; using System.Text.Json; using RocketMQ.Client; using RocketMQ.Client.Core; using RocketMQ.Client.Producer; using RocketMQ.Client.Message; public class RocketMqProducerService { private readonly RocketMqOptions _options; public RocketMqProducerService(RocketMqOptions options) { _options options; } public async Task SendCallCenterEventAsync(CallCenterEvent callEvent) { var credentialsProvider new StaticSessionCredentialsProvider( _options.AccessKey, _options.SecretKey ); var clientConfig new ClientConfig.Builder() .SetEndpoints(_options.Endpoint) .SetNamespace(_options.InstanceId) .SetCredentialsProvider(credentialsProvider) .Build(); var producer await new Producer.Builder() .SetClientConfig(clientConfig) .SetTopics(_options.Topic) .Build(); string json JsonSerializer.Serialize(callEvent); var message new Message.Builder() .SetTopic(_options.Topic) .SetBody(Encoding.UTF8.GetBytes(json)) .SetKeys(callEvent.EventId) .SetTag(callEvent.EventType) .Build(); var sendReceipt await producer.SendAsync(message); Console.WriteLine($消息发送成功MessageId{sendReceipt.MessageId}); } }代码解释StaticSessionCredentialsProvider用于配置 AccessKey 和 SecretKey。SetEndpoints(...)用于设置 RocketMQ 接入点。SetNamespace(...)用于设置实例 ID。Serverless 版公网访问时通常需要设置 InstanceId。SetTopics(...)用于指定生产者要发送到哪些 Topic。SetKeys(...)用于设置消息 Key建议放业务唯一标识比如 EventId、OrderId、CallId。SetTag(...)用于设置消息标签比如 IncomingCall、CallAnswered、CallHangup。后续消费者可以根据 Tag 过滤不同类型的消息。十二、C# 消费消息示例消费者负责从 RocketMQ 里读取消息并处理业务。using System.Text; using System.Text.Json; using RocketMQ.Client; using RocketMQ.Client.Core; using RocketMQ.Client.Consumer; public class RocketMqConsumerWorker : BackgroundService { private readonly RocketMqOptions _options; public RocketMqConsumerWorker(RocketMqOptions options) { _options options; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var credentialsProvider new StaticSessionCredentialsProvider( _options.AccessKey, _options.SecretKey ); var clientConfig new ClientConfig.Builder() .SetEndpoints(_options.Endpoint) .SetNamespace(_options.InstanceId) .SetCredentialsProvider(credentialsProvider) .Build(); var consumer await new SimpleConsumer.Builder() .SetClientConfig(clientConfig) .SetConsumerGroup(_options.ConsumerGroup) .SetSubscriptionExpressions(new Dictionarystring, FilterExpression { { _options.Topic, new FilterExpression(*) } }) .Build(); Console.WriteLine(RocketMQ 消费者已启动...); while (!stoppingToken.IsCancellationRequested) { try { var messages await consumer.ReceiveAsync( maxMessageNum: 16, invisibleDuration: TimeSpan.FromSeconds(30) ); foreach (var message in messages) { try { string json Encoding.UTF8.GetString(message.Body.ToArray()); Console.WriteLine(收到消息 json); var callEvent JsonSerializer.DeserializeCallCenterEvent(json); if (callEvent null) { await consumer.AckAsync(message); continue; } await HandleCallCenterEventAsync(callEvent); await consumer.AckAsync(message); } catch (Exception ex) { Console.WriteLine(处理消息失败 ex.Message); // 不 AckRocketMQ 后续会重新投递 } } } catch (Exception ex) { Console.WriteLine(拉取消息异常 ex.Message); await Task.Delay(3000, stoppingToken); } } } private async Task HandleCallCenterEventAsync(CallCenterEvent callEvent) { switch (callEvent.EventType) { case IncomingCall: Console.WriteLine($来电{callEvent.Caller}坐席{callEvent.AgentId}); break; case CallAnswered: Console.WriteLine($通话已接听CallId{callEvent.CallId}); break; case CallHangup: Console.WriteLine($通话已挂断CallId{callEvent.CallId}); break; case RecordingReady: Console.WriteLine($录音已生成CallId{callEvent.CallId}); break; default: Console.WriteLine($未知事件类型{callEvent.EventType}); break; } await Task.CompletedTask; } }十三、消费者代码重点解释1. ReceiveAsyncvar messages await consumer.ReceiveAsync(16, TimeSpan.FromSeconds(30));表示一次最多拉取 16 条消息。第二个参数表示消息的不可见时间。简单理解消费者拿到消息后这条消息会暂时对其他消费者不可见。 如果消费者处理成功并 Ack消息就算消费完成。 如果消费者没有 Ack超过不可见时间后消息会重新投递。2. AckAsyncawait consumer.AckAsync(message);Ack 表示确认消费成功。只有业务真正处理完成后才应该 Ack。不要一收到消息就 Ack否则业务处理失败时消息就丢了。正确顺序是1. 收到消息 2. 解析消息 3. 校验幂等 4. 执行业务逻辑 5. 写数据库 6. 通知客户端 7. Ack3. 处理失败不要 Ackcatch (Exception ex) { // 不 Ack等待 RocketMQ 重投 }这就是 RocketMQ 的可靠性来源之一。如果你的服务处理消息时崩溃了只要没有 Ack消息后续还能重新投递。十四、非常重要消息幂等设计RocketMQ 这类消息队列通常要按照“至少一次投递”来设计。意思是一条消息至少会投递一次但在异常情况下可能会投递多次。为什么会重复1. 消费者处理成功了但 Ack 失败 2. 消费者处理过程中网络断开 3. 消费者处理时间超过不可见时间 4. 服务重启导致消息重新投递所以业务系统必须做幂等。1. 建议数据库表CREATE TABLE MQ_Message_Log ( Id BIGINT IDENTITY(1,1) PRIMARY KEY, MessageKey NVARCHAR(100) NOT NULL, Topic NVARCHAR(100) NOT NULL, EventType NVARCHAR(50) NULL, BusinessId NVARCHAR(100) NULL, Body NVARCHAR(MAX) NULL, ConsumeStatus INT NOT NULL, CreateTime DATETIME NOT NULL DEFAULT GETDATE(), UpdateTime DATETIME NULL ); CREATE UNIQUE INDEX UX_MQ_Message_Log_MessageKey ON MQ_Message_Log(MessageKey);2. 消费前判断是否处理过private async Taskbool HasProcessedAsync(string eventId) { // 这里演示伪代码 // 实际项目中可以用 Dapper、EF Core、ADO.NET 查询数据库 string sql SELECT COUNT(1) FROM MQ_Message_Log WHERE MessageKey EventId AND ConsumeStatus 1; return await Task.FromResult(false); }3. 幂等消费流程private async Task HandleCallCenterEventAsync(CallCenterEvent callEvent) { bool processed await HasProcessedAsync(callEvent.EventId); if (processed) { Console.WriteLine($消息已处理过EventId{callEvent.EventId}); return; } switch (callEvent.EventType) { case IncomingCall: await HandleIncomingCallAsync(callEvent); break; case CallHangup: await HandleCallHangupAsync(callEvent); break; } await SaveMessageLogAsync(callEvent); }幂等设计的核心原则同一条业务消息重复消费多次最终结果应该和消费一次一样。比如同一个来电事件不能弹屏 3 次。同一个订单不能扣 3 次库存。同一个短信不能发 3 遍。十五、RocketMQ 的底层工作流程从整体链路看RocketMQ 的工作流程大概是Producer 创建消息 ↓ Producer 发送消息到 Broker ↓ Broker 持久化消息 ↓ Consumer 从 Broker 拉取消息 ↓ Consumer 执行业务逻辑 ↓ Consumer Ack 确认消费成功 ↓ Broker 标记消费进度这里面有几个核心组件1. BrokerBroker 是真正存储消息和投递消息的服务。Producer 发送消息本质上是把消息发送给 Broker。Consumer 消费消息本质上是从 Broker 获取消息。2. Topic 和 Queue一个 Topic 底层会有多个 Queue。可以理解为Topic 是逻辑分类 Queue 是实际存储和并发消费的分片多个 Queue 可以提高并发能力。3. 消费位点RocketMQ 会记录每个 ConsumerGroup 消费到哪里了。比如GID_CallCenter_Popup 消费到第 100 条 GID_CallCenter_Record 消费到第 80 条不同 ConsumerGroup 之间的消费进度互不影响。这也是为什么同一条消息可以被多个业务系统各自消费。4. 重试机制如果消费者处理失败没有 AckRocketMQ 会重新投递。这就是消费重试。但重试不是无限的。超过最大重试次数后消息可能进入死信队列。5. 死信队列死信队列用于存放一直消费失败的消息。比如消息格式错误 业务数据不存在 数据库字段超长 代码一直抛异常这些消息一直重试也没有意义就会进入死信队列。生产环境一定要监控死信队列。十六、真实项目推荐架构以呼叫中心为例推荐架构如下阿里云云联络中心 ↓ RocketMQ TopicCallCenterEventTopic ↓ .NET 8 Worker Service 消费消息 ↓ 写入 SQL Server ↓ 通过 WebSocket / SignalR / TCP 通知 WinForms 客户端 ↓ 客服客户端来电弹屏不建议这样WinForms 客户端直接连接 RocketMQ推荐这样WinForms 客户端 ↓ 你自己的业务服务 ↓ RocketMQ 消费服务 ↓ 阿里云 RocketMQ这样做的好处1. 客户端不暴露密钥 2. 消息消费集中管理 3. 可以统一做日志 4. 可以统一做幂等 5. 可以统一做异常重试 6. 可以兼容老系统 7. 后期可以扩展更多云厂商十七、C# 老项目怎么兼容很多公司还在使用.NET Framework 4.5 .NET Framework 4.0 甚至 .NET Framework 2.0这种情况下不建议在老项目里硬接 RocketMQ。更推荐新增一个现代化服务RocketMqEventService技术栈.NET 6 / .NET 8 Worker Service RocketMQ.Client SQL Server WebSocket / SignalR / HTTP API然后老项目通过 HTTP 或 TCP 调用这个新服务。架构如下老 WinForms 客户端 ↓ 老业务服务 ↓ RocketMqEventService ↓ 阿里云 RocketMQ这是一种比较稳妥的“旁路改造”方案。它不会大规模破坏老系统也能逐步引入现代化架构。十八、生产环境注意事项1. 不要把密钥写死错误做法string accessKey xxxxx; string secretKey xxxxx;推荐环境变量 配置中心 KMS 容器密钥2. 消费逻辑一定要幂等消息重复投递不是 RocketMQ 的 bug而是分布式系统的常态。只要用了消息队列就必须考虑幂等。3. 不要一收到消息就 Ack错误做法收到消息 ↓ 马上 Ack ↓ 再处理业务如果后面的业务失败了消息已经被确认无法自动重试。正确做法收到消息 ↓ 处理业务 ↓ 写数据库 ↓ 成功后 Ack4. 消费失败要记录日志消费失败时至少记录MessageId MessageKey Topic Tag Body 异常堆栈 失败时间 重试次数这样后期排查问题才有依据。5. 要监控消息堆积消息堆积说明消费者处理不过来了。常见原因1. 消费者服务挂了 2. 消费者处理速度太慢 3. 数据库性能瓶颈 4. 下游接口响应慢 5. 消费线程数量不足6. 合理拆分 Topic 和 ConsumerGroup不要所有业务都丢到一个 Topic。可以按业务域拆CallCenterEventTopic OrderEventTopic UserEventTopic LogEventTopic也不要多个完全不同的业务混用一个 ConsumerGroup。十九、RocketMQ 和数据库轮询对比对比项数据库轮询RocketMQ实时性较差较好系统耦合高低数据库压力大小失败重试需要自己做MQ 支持消息堆积能力弱强扩展性一般强适合场景小系统、低频任务中大型系统、事件驱动二十、总结RocketMQ 不是简单的“发消息工具”它更像是系统架构里的事件总线。在 C# 项目中引入 RocketMQ可以解决很多传统架构里的问题1. 服务之间解耦 2. 削峰填谷 3. 异步处理 4. 失败重试 5. 消息可靠投递 6. 业务事件统一分发对于传统 C# WinForms、.NET Framework 老项目来说不建议直接在客户端或老服务端里硬接 RocketMQ。更合理的方式是新增一个现代化的 .NET 6 / .NET 8 后台服务专门负责 RocketMQ 消息消费、业务落库、幂等控制、事件分发。最终架构应该是业务事件 ↓ RocketMQ ↓ C# 后台服务 ↓ 数据库 / WebSocket / HTTP API ↓ 业务客户端这就是从“接口调用型系统”向“事件驱动型系统”的升级。对于做企业软件、呼叫中心、订单系统、工单系统、即时通知系统的 C# 开发者来说RocketMQ 是一个非常值得掌握的中间件。