natsexample是 goweb3 项目中NATS 消息队列的示例模块演示了如何使用gonats模块进行异步消息发布和订阅。一、模块架构plainTextnatsexample/ ├── main.go # 主入口启动 NATS 服务 ├── startnats/ │ └── startnats.go # NATS 启动配置和发布逻辑 ├── handler/ │ └── handler.go # 消息处理器同步/异步 └── test/ └── publish_cliasync_test.go # 测试用例二、核心组件详解1. main.go - 启动入口gofunc main() { var conn, err startnats.StartNats() if err ! nil { golog.Error(StartNats err:, err) } defer conn.Close() for { time.Sleep(time.Second * 1200000) // 保持服务运行 } }功能说明调用startnats.StartNats()启动 NATS 订阅阻塞主进程保持服务运行2. startnats.go - NATS 启动配置gofunc StartNats() (*nats.Conn, error) { var topic goconsts.Topic_Async_DefaultPrefix topic goconsts.Topic_Async_Website_Default // 使用网站默认主题 golog.Info(topic is :, topic) return startnats.StartSubscribeAsyncTopic(topic, handler.HandleMessageAsync) } func PublisMsg() { var msg gomsg.FindBeanGonatsMsg() msg.Header.ObjectId gconv.String(baseutils.SnowflakeNextVal()) msg.Header.Topic goconsts.Topic_Async_Default var result basedto.NewIchubResult() msg.SetData(result) msg.Header.MsgReq true var err cliasync.FindBeanCliAsync().PublishMsg(msg.ToNats()) if err ! nil { golog.Error(err) } }关键组件组件说明goconsts主题常量定义gomsg.FindBeanGonatsMsg()获取消息封装对象cliasync.FindBeanCliAsync()获取异步客户端startnats.StartSubscribeAsyncTopic()启动异步订阅3. handler.go - 消息处理器go// 同步消息处理无响应 func HandleMessage(msg *nats.Msg) { logrus.Info(time.Now(), jsonutils.ToJsonPretty(msg)) var gonatsMsg gomsg.FindBeanGonatsMsg() gonatsMsg.From(msg) golog.Info(handler handleMessage Async: , gonatsMsg) var body string(gonatsMsg.Data) logrus.Info(body) } // 异步消息处理无响应 func HandleMessageAsync(msg *nats.Msg) { logrus.Info(time.Now(), jsonutils.ToJsonPretty(msg)) var gonatsMsg gomsg.FindBeanGonatsMsg() gonatsMsg.From(msg) golog.Info(handler handleMessage Async: , gonatsMsg) var body string(gonatsMsg.Data) logrus.Info(body) } // 同步请求-响应模式 func HandleMessageSync(msg *nats.Msg) { var gonatsMsg gomsg.FindBeanGonatsMsg() gonatsMsg.From(msg) gonatsMsg.Body string(gonatsMsg.Data) golog.Info(server handleMessageSync , gonatsMsg) var resp gonatsMsg.ToNats() resp.Data []byte(server handleMessageSync resp) var config ichubconfig.FindBeanIchubConfig() resp.Data config.ReadIchubWebClient().ToJsonBytes() var err msg.RespondMsg(resp) // 回复消息 if err ! nil { golog.Error(server handleMessageSync , err) } }三种处理模式模式函数特点同步处理HandleMessage普通订阅无响应异步处理HandleMessageAsync异步订阅无响应请求-响应HandleMessageSync接收请求并回复响应三、消息封装结构消息通过gomsg.GonatsMsg封装plainTextGonatsMsg ├── Header # 消息头 │ ├── ObjectId # 唯一标识Snowflake ID │ ├── Topic # 主题 │ └── MsgReq # 是否为请求消息 ├── Body # 消息体字符串 └── Data # 消息数据字节数组四、测试用例分析gofunc (suite *TestCliAsyncSuite) Test001_asyncCli_publish() { var msg gomsg.FindBeanGonatsMsg() msg.Header.ObjectId gconv.String(baseutils.SnowflakeNextVal()) msg.Header.Topic goconsts.Topic_DOMAIN_GENEAL_AsyncES_BIZ var result basedto.NewIchubResult() msg.SetData(result) msg.Header.MsgReq true var err cliasync.FindBeanCliAsync().Publish(msg) if err ! nil { golog.Error(err) } }测试流程获取GonatsMsg消息对象设置消息头ObjectId、Topic设置消息数据通过CliAsync发布消息五、依赖关系plainTextnatsexample ├── goconfig/ichublog/golog # 日志 ├── goconfig/ichubconfig # 配置 ├── gonats/natscore/goconsts # 主题常量 ├── gonats/natscore/gomsg # 消息封装 ├── gonats/natsserver/async/cliasync # 异步客户端 └── gonats/natsserver/startnats # 订阅启动六、运行流程plainText┌─────────────────────────────────────────────────────────────┐ │ natsexample 运行流程 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. main() 启动 │ │ │ │ │ ▼ │ │ 2. startnats.StartNats() │ │ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ 设置订阅主题 Topic_Async_Website_Default │ │ │ │ │ 调用 startnats.StartSubscribeAsyncTopic │ │ │ │ │ 注册 handler.HandleMessageAsync 处理器 │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 3. 阻塞等待消息 │ │ │ │ │ ▼ │ │ 4. 收到消息 → HandleMessageAsync() 处理 │ │ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ gomsg.FindBeanGonatsMsg().From(msg) │ │ │ │ │ 解析 NATS 消息为 GonatsMsg │ │ │ │ │ 业务处理... │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘七、消息发布流程go// 发布消息示例 func PublisMsg() { // 1. 获取消息对象 var msg gomsg.FindBeanGonatsMsg() // 2. 设置消息头 msg.Header.ObjectId gconv.String(baseutils.SnowflakeNextVal()) msg.Header.Topic goconsts.Topic_Async_Default msg.Header.MsgReq true // 标记为请求消息 // 3. 设置消息数据 var result basedto.NewIchubResult() msg.SetData(result) // 4. 发布消息 var err cliasync.FindBeanCliAsync().PublishMsg(msg.ToNats()) }八、主题常量主题常量说明Topic_Async_DefaultPrefix默认前缀Topic_Async_Default默认主题Topic_Async_Website_Default网站默认主题Topic_DOMAIN_GENEAL_AsyncES_BIZES 业务异步主题九、总结natsexample模块展示了 goweb3 项目中NATS 消息队列的使用方式特性实现异步消息通过CliAsync发布HandleMessageAsync处理同步请求-响应HandleMessageSync支持请求响应模式消息封装GonatsMsg统一封装消息头和数据分布式通信基于 NATS 实现松耦合的服务间通信唯一标识使用 Snowflake 生成 ObjectId
goweb3系列解析10: natsexample 模块解析
发布时间:2026/6/7 11:47:34
natsexample是 goweb3 项目中NATS 消息队列的示例模块演示了如何使用gonats模块进行异步消息发布和订阅。一、模块架构plainTextnatsexample/ ├── main.go # 主入口启动 NATS 服务 ├── startnats/ │ └── startnats.go # NATS 启动配置和发布逻辑 ├── handler/ │ └── handler.go # 消息处理器同步/异步 └── test/ └── publish_cliasync_test.go # 测试用例二、核心组件详解1. main.go - 启动入口gofunc main() { var conn, err startnats.StartNats() if err ! nil { golog.Error(StartNats err:, err) } defer conn.Close() for { time.Sleep(time.Second * 1200000) // 保持服务运行 } }功能说明调用startnats.StartNats()启动 NATS 订阅阻塞主进程保持服务运行2. startnats.go - NATS 启动配置gofunc StartNats() (*nats.Conn, error) { var topic goconsts.Topic_Async_DefaultPrefix topic goconsts.Topic_Async_Website_Default // 使用网站默认主题 golog.Info(topic is :, topic) return startnats.StartSubscribeAsyncTopic(topic, handler.HandleMessageAsync) } func PublisMsg() { var msg gomsg.FindBeanGonatsMsg() msg.Header.ObjectId gconv.String(baseutils.SnowflakeNextVal()) msg.Header.Topic goconsts.Topic_Async_Default var result basedto.NewIchubResult() msg.SetData(result) msg.Header.MsgReq true var err cliasync.FindBeanCliAsync().PublishMsg(msg.ToNats()) if err ! nil { golog.Error(err) } }关键组件组件说明goconsts主题常量定义gomsg.FindBeanGonatsMsg()获取消息封装对象cliasync.FindBeanCliAsync()获取异步客户端startnats.StartSubscribeAsyncTopic()启动异步订阅3. handler.go - 消息处理器go// 同步消息处理无响应 func HandleMessage(msg *nats.Msg) { logrus.Info(time.Now(), jsonutils.ToJsonPretty(msg)) var gonatsMsg gomsg.FindBeanGonatsMsg() gonatsMsg.From(msg) golog.Info(handler handleMessage Async: , gonatsMsg) var body string(gonatsMsg.Data) logrus.Info(body) } // 异步消息处理无响应 func HandleMessageAsync(msg *nats.Msg) { logrus.Info(time.Now(), jsonutils.ToJsonPretty(msg)) var gonatsMsg gomsg.FindBeanGonatsMsg() gonatsMsg.From(msg) golog.Info(handler handleMessage Async: , gonatsMsg) var body string(gonatsMsg.Data) logrus.Info(body) } // 同步请求-响应模式 func HandleMessageSync(msg *nats.Msg) { var gonatsMsg gomsg.FindBeanGonatsMsg() gonatsMsg.From(msg) gonatsMsg.Body string(gonatsMsg.Data) golog.Info(server handleMessageSync , gonatsMsg) var resp gonatsMsg.ToNats() resp.Data []byte(server handleMessageSync resp) var config ichubconfig.FindBeanIchubConfig() resp.Data config.ReadIchubWebClient().ToJsonBytes() var err msg.RespondMsg(resp) // 回复消息 if err ! nil { golog.Error(server handleMessageSync , err) } }三种处理模式模式函数特点同步处理HandleMessage普通订阅无响应异步处理HandleMessageAsync异步订阅无响应请求-响应HandleMessageSync接收请求并回复响应三、消息封装结构消息通过gomsg.GonatsMsg封装plainTextGonatsMsg ├── Header # 消息头 │ ├── ObjectId # 唯一标识Snowflake ID │ ├── Topic # 主题 │ └── MsgReq # 是否为请求消息 ├── Body # 消息体字符串 └── Data # 消息数据字节数组四、测试用例分析gofunc (suite *TestCliAsyncSuite) Test001_asyncCli_publish() { var msg gomsg.FindBeanGonatsMsg() msg.Header.ObjectId gconv.String(baseutils.SnowflakeNextVal()) msg.Header.Topic goconsts.Topic_DOMAIN_GENEAL_AsyncES_BIZ var result basedto.NewIchubResult() msg.SetData(result) msg.Header.MsgReq true var err cliasync.FindBeanCliAsync().Publish(msg) if err ! nil { golog.Error(err) } }测试流程获取GonatsMsg消息对象设置消息头ObjectId、Topic设置消息数据通过CliAsync发布消息五、依赖关系plainTextnatsexample ├── goconfig/ichublog/golog # 日志 ├── goconfig/ichubconfig # 配置 ├── gonats/natscore/goconsts # 主题常量 ├── gonats/natscore/gomsg # 消息封装 ├── gonats/natsserver/async/cliasync # 异步客户端 └── gonats/natsserver/startnats # 订阅启动六、运行流程plainText┌─────────────────────────────────────────────────────────────┐ │ natsexample 运行流程 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. main() 启动 │ │ │ │ │ ▼ │ │ 2. startnats.StartNats() │ │ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ 设置订阅主题 Topic_Async_Website_Default │ │ │ │ │ 调用 startnats.StartSubscribeAsyncTopic │ │ │ │ │ 注册 handler.HandleMessageAsync 处理器 │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 3. 阻塞等待消息 │ │ │ │ │ ▼ │ │ 4. 收到消息 → HandleMessageAsync() 处理 │ │ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ gomsg.FindBeanGonatsMsg().From(msg) │ │ │ │ │ 解析 NATS 消息为 GonatsMsg │ │ │ │ │ 业务处理... │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘七、消息发布流程go// 发布消息示例 func PublisMsg() { // 1. 获取消息对象 var msg gomsg.FindBeanGonatsMsg() // 2. 设置消息头 msg.Header.ObjectId gconv.String(baseutils.SnowflakeNextVal()) msg.Header.Topic goconsts.Topic_Async_Default msg.Header.MsgReq true // 标记为请求消息 // 3. 设置消息数据 var result basedto.NewIchubResult() msg.SetData(result) // 4. 发布消息 var err cliasync.FindBeanCliAsync().PublishMsg(msg.ToNats()) }八、主题常量主题常量说明Topic_Async_DefaultPrefix默认前缀Topic_Async_Default默认主题Topic_Async_Website_Default网站默认主题Topic_DOMAIN_GENEAL_AsyncES_BIZES 业务异步主题九、总结natsexample模块展示了 goweb3 项目中NATS 消息队列的使用方式特性实现异步消息通过CliAsync发布HandleMessageAsync处理同步请求-响应HandleMessageSync支持请求响应模式消息封装GonatsMsg统一封装消息头和数据分布式通信基于 NATS 实现松耦合的服务间通信唯一标识使用 Snowflake 生成 ObjectId