个人主页代码不加冰欢迎来访作者简介java后端学习者❄️个人专栏LeetCode刷题日记 苍穹外卖日记SSM框架深入JavaWeb✨命运的结局尽可永在不屈的挑战却不可须臾或缺前言大家好我是代码不加冰前面我们安装完成 了RabbitMQ并对控制台操作进行了大致的了解通过一个案例来进行实践这里我们继续深入学习一下。摘要消息队列是分布式系统的核心组件而 RabbitMQ 凭借其稳定性和灵活性成为首选。本文从零开始手把手带你掌握 RabbitMQ 的三大核心交换机 Fanout 广播模式支付成功后同时通知订单、积分、短信系统一呼百应 Direct 直连模式ERROR 日志发钉钉告警、INFO 日志写文件归档精准路由 Topic 主题模式外卖平台通过order.#、user.*等通配符实现灵活订阅微服务解耦利器。文章提供完整的 Spring Boot 整合代码涵盖RabbitTemplate发送消息、RabbitListener注解消费、YAML 配置详解以及 Work Queue 工作队列的公平分发机制。通过快递分拣、日志处理、事件总线等真实场景类比让复杂概念一目了然。无论你是新手还是进阶者这篇保姆级教程都能让你快速上手 RabbitMQ写出生产级别的消息驱动代码前置知识AMQP Advanced Message Queuing Protocol高级消息队列协议简单说它是一种标准化的消息传递规则让不同的程序之间可以跨语言、跨平台地通信。通俗理解把 AMQP 想象成快递行业的统一标准场景快递行业AMQP标准规范快递包裹有统一尺寸、面单格式消息有固定格式和规则发送方你寄件人生产者Producer中转站快递分拣中心交换机Exchange目的地你的地址/快递柜队列Queue接收方收件人消费者Consumer有了这个标准顺丰寄出的包裹中通能送到同样的道理Python 写的生产者发的消息Java 写的消费者能收到。有 AMQP 时统一标准text Python程序 ──→ │ Java程序 ──→ │ AMQP协议 ──→ 任何支持AMQP的消息中间件 Go程序 ──→ │AMQP 的核心概念AMQP 概念RabbitMQ 实现作用Producer生产者代码发消息的一方Consumer消费者代码收消息的一方Exchange交换机Direct、Topic、Fanout等决定消息去哪个队列Queue队列存消息的地方Binding绑定关系把交换机和队列连接起来Virtual Host虚拟主机隔离环境类似数据库的schemaSpring AMQP 是什么Spring AMQP是 Spring 框架对 AMQP 协议的封装和增强让你用 Spring 的方式操作 RabbitMQ而不需要直接写原生的 AMQP 代码。Spring AMQP Spring 团队写的 RabbitMQ 便捷工具包Spring AMQP 的核心组件组件作用原生 AMQP 对应RabbitTemplate发送消息的工具类Channel.basicPublishRabbitListener监听队列、自动消费手写 Consumer 循环MessageConverter自动转 Java 对象为消息手动序列化RabbitAdmin自动声明队列/交换机手动调用 queueDeclarejava客户端入门案例1、创建 Spring Boot 项目在 IDEA 中File → New → Project → Spring Initializr依赖选择Spring Web可选Spring Boot DevTools可选2、pom.xml 依赖xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version2.7.14/version relativePath/ /parent groupIdcom.example/groupId artifactIdrabbitmq-springboot-demo/artifactId version1.0.0/version properties java.version11/java.version /properties dependencies !-- Spring Boot AMQP Starter包含RabbitMQ -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency !-- Spring Boot Web可选用于测试 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency !-- Lombok可选简化代码 -- dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId optionaltrue/optional /dependency !-- 测试依赖 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-test/artifactId scopetest/scope /dependency /dependencies /project3、application.yml 配置文件yaml spring: rabbitmq: # 基础连接配置 host: localhost port: 5672 username: admin password: 123456 virtual-host: / # 连接池配置可选 connection-timeout: 5000 # 连接超时毫秒 # 生产者确认机制可选 publisher-confirm-type: correlated # 开启发送确认 publisher-returns: true # 开启消息返回 # 消费者配置可选 listener: simple: acknowledge-mode: auto # 自动确认 concurrency: 3 # 最小消费者数 max-concurrency: 10 # 最大消费者数 prefetch: 1 # 每次拉取消息数 retry: enabled: true # 开启重试 max-attempts: 3 # 最大重试次数 initial-interval: 1000 # 重试间隔毫秒 # 自定义配置项目相关 rabbitmq: queue: name: hello_queue exchange: name: hello_exchange routing: key: hello_routing_key4、配置类读取YAML方式一使用 ConfigurationPropertiesjava import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; Data Component ConfigurationProperties(prefix rabbitmq) public class RabbitMQProperties { private QueueConfig queue new QueueConfig(); private ExchangeConfig exchange new ExchangeConfig(); private RoutingConfig routing new RoutingConfig(); Data public static class QueueConfig { private String name; } Data public static class ExchangeConfig { private String name; } Data public static class RoutingConfig { private String key; } }5、RabbitMQ 配置类java import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; Configuration public class RabbitMQConfig { Value(${spring.rabbitmq.host}) private String host; Value(${spring.rabbitmq.port}) private int port; Value(${spring.rabbitmq.username}) private String username; Value(${spring.rabbitmq.password}) private String password; Value(${spring.rabbitmq.virtual-host}) private String virtualHost; Value(${rabbitmq.queue.name}) private String queueName; Value(${rabbitmq.exchange.name}) private String exchangeName; Value(${rabbitmq.routing.key}) private String routingKey; /** * 创建连接工厂 */ Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory new CachingConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); // 开启发布确认 factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); factory.setPublisherReturns(true); return factory; } /** * 创建 RabbitTemplate用于发送消息 */ Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template new RabbitTemplate(connectionFactory); // 设置强制消息路由失败后返回 template.setMandatory(true); return template; } /** * 创建 RabbitAdmin用于声明队列、交换机、绑定 */ Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } /** * 声明队列 */ Bean public Queue queue() { // 参数name, durable(持久化), exclusive(独占), autoDelete(自动删除) return new Queue(queueName, true, false, false); } /** * 声明直连交换机 */ Bean public DirectExchange exchange() { return new DirectExchange(exchangeName, true, false); } /** * 绑定队列到交换机 */ Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(routingKey); } }6、生产者服务发送消息java import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; Slf4j Service RequiredArgsConstructor public class MessageProducer { private final RabbitTemplate rabbitTemplate; Value(${rabbitmq.exchange.name}) private String exchangeName; Value(${rabbitmq.routing.key}) private String routingKey; /** * 发送消息 */ public void sendMessage(String message) { // 生成唯一ID用于追踪 CorrelationData correlationId new CorrelationData(java.util.UUID.randomUUID().toString()); log.info( 准备发送消息: {}, message); // 发送消息 rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationId); // 设置发送确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - { if (ack) { log.info(✅ 消息发送成功! correlationId: {}, correlationData.getId()); } else { log.error(❌ 消息发送失败! cause: {}, cause); } }); log.info( 消息已发送到交换机: {}, 路由键: {}, exchangeName, routingKey); } /** * 发送对象消息自动转JSON */ public void sendObject(Object object) { rabbitTemplate.convertAndSend(exchangeName, routingKey, object); log.info( 对象消息已发送: {}, object); } }7、消费者服务接收消息java import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; Slf4j Service public class MessageConsumer { /** * 监听队列消费消息 */ RabbitListener(queues ${rabbitmq.queue.name}) public void receiveMessage(String message) { log.info( 收到消息: {}, message); // 处理业务逻辑 processMessage(message); } /** * 处理消息的业务逻辑 */ private void processMessage(String message) { // 模拟业务处理 log.info( 正在处理消息: {}, message); // 这里可以添加你的业务代码 } /** * 监听队列消费对象消息 */ RabbitListener(queues ${rabbitmq.queue.name}) public void receiveObject(Object object) { log.info( 收到对象消息: {}, object); } }8、测试 Controller方便测试java import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.*; RestController RequestMapping(/api/message) RequiredArgsConstructor public class MessageController { private final MessageProducer messageProducer; PostMapping(/send) public String sendMessage(RequestParam String content) { messageProducer.sendMessage(content); return 消息已发送: content; } PostMapping(/send/json) public String sendJson(RequestBody User user) { messageProducer.sendObject(user); return 用户信息已发送: user.getName(); } // 测试用实体类 static class User { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name name; } public int getAge() { return age; } public void setAge(int age) { this.age age; } Override public String toString() { return User{name name , age age }; } } }9、启动类java import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication public class RabbitMQApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQApplication.class, args); System.out.println( RabbitMQ Demo 启动成功!); System.out.println( 访问: http://localhost:8080/api/message/send?contenthello); } }以上就是简单的入门案例然后我们继续深入一点Work Queue工作队列模型Work Queue是 RabbitMQ 中一个队列多个消费者的模式消息会被平均分配给各个消费者。模型图解text ┌─────────────┐ │ Producer │ │ (生产者) │ └──────┬──────┘ │ ▼ ┌─────────────┐ │ Queue │ │ Task Queue │ │ [m1][m2][m3]│ └──────┬──────┘ │ ┌────────────┼────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │Consumer1 │ │Consumer2 │ │Consumer3 │ │ (C1) │ │ (C2) │ │ (C3) │ └──────────┘ └──────────┘ └──────────┘ 核心特点一个队列多个消费者消息默认轮询分发Round-Robin每个消息只能被一个消费者消费常见面试题问题答案Work Queue 的特点一个队列多个消费者消息只能被一个消费者消费轮询分发的问题处理快的消费者和慢的消费者工作量相同整体效率低如何解决轮询分发问题basicQos(1) 手动确认消息确认的作用保证消息不丢失处理失败可重新入队Fanout 交换机广播交换机Fanout 交换机把收到的消息广播到所有绑定的队列不管 Routing Key 是什么。核心特点特点说明广播模式消息发送给所有绑定的队列忽略 Routing KeyRouting Key 写什么都被忽略一对多一个消息 → 多个消费者同时收到业务场景用户支付成功后需要同时通知多个系统text 用户支付100元 │ ▼ 支付成功事件 ──Fanout──→ 同时发给3个系统 │ ├──→ 订单系统更新订单状态为已支付 ├──→ 积分系统给用户加100积分 └──→ 短信系统发短信通知用户Direct 交换机直连交换机核心特点精确匹配Routing Key 必须完全相等一对一或一对多多个队列可以用同一个 Routing Keytext 应用产生日志 │ ▼ Direct Exchange (log_exchange) │ ├── Routing Key ERROR ──→ 错误队列 ──→ 运维人员发钉钉告警 ├── Routing Key WARN ──→ 警告队列 ──→ 存入数据库待查 └── Routing Key INFO ──→ 信息队列 ──→ 写入文件归档业务场景一个电商系统不同级别的日志需要不同处理为什么用 Direct✅ ERROR 消息只给告警系统✅ WARN 消息只给分析系统✅ INFO 消息只给归档系统✅精确匹配不会发错Topic 交换机主题交换机核心特点模糊匹配支持通配符*和#更灵活的路由规则通配符规则通配符含义示例*匹配一个词log.*.error→ 匹配log.app.error#匹配零个或多个词log.#→ 匹配log.app.error,log.app,log.app.service.error分隔符用.分隔单词user.order.create业务场景一个外卖平台各种事件需要灵活订阅text 事件类型Routing Key 设计: ├── order.created # 订单创建 ├── order.cancelled # 订单取消 ├── order.delivered # 订单送达 ├── user.registered # 用户注册 ├── user.login # 用户登录 ├── payment.success # 支付成功 ├── payment.refund # 退款成功 ├── rider.accepted # 骑手接单 ├── rider.arrived # 骑手到店 └── rider.completed # 骑手送达不同服务的订阅规则服务订阅规则匹配到的事件短信服务order.#和user.#所有订单和用户相关事件统计分析#.created和#.cancelled所有创建和取消事件骑手APPrider.*所有骑手相关事件财务服务payment.#所有支付相关事件风控服务*.cancelled和payment.refund取消事件和退款事件
RabbitMQ 从入门到实战!一文搞懂核心交换机 + Spring Boot 整合,附完整代码
发布时间:2026/6/4 0:26:17
个人主页代码不加冰欢迎来访作者简介java后端学习者❄️个人专栏LeetCode刷题日记 苍穹外卖日记SSM框架深入JavaWeb✨命运的结局尽可永在不屈的挑战却不可须臾或缺前言大家好我是代码不加冰前面我们安装完成 了RabbitMQ并对控制台操作进行了大致的了解通过一个案例来进行实践这里我们继续深入学习一下。摘要消息队列是分布式系统的核心组件而 RabbitMQ 凭借其稳定性和灵活性成为首选。本文从零开始手把手带你掌握 RabbitMQ 的三大核心交换机 Fanout 广播模式支付成功后同时通知订单、积分、短信系统一呼百应 Direct 直连模式ERROR 日志发钉钉告警、INFO 日志写文件归档精准路由 Topic 主题模式外卖平台通过order.#、user.*等通配符实现灵活订阅微服务解耦利器。文章提供完整的 Spring Boot 整合代码涵盖RabbitTemplate发送消息、RabbitListener注解消费、YAML 配置详解以及 Work Queue 工作队列的公平分发机制。通过快递分拣、日志处理、事件总线等真实场景类比让复杂概念一目了然。无论你是新手还是进阶者这篇保姆级教程都能让你快速上手 RabbitMQ写出生产级别的消息驱动代码前置知识AMQP Advanced Message Queuing Protocol高级消息队列协议简单说它是一种标准化的消息传递规则让不同的程序之间可以跨语言、跨平台地通信。通俗理解把 AMQP 想象成快递行业的统一标准场景快递行业AMQP标准规范快递包裹有统一尺寸、面单格式消息有固定格式和规则发送方你寄件人生产者Producer中转站快递分拣中心交换机Exchange目的地你的地址/快递柜队列Queue接收方收件人消费者Consumer有了这个标准顺丰寄出的包裹中通能送到同样的道理Python 写的生产者发的消息Java 写的消费者能收到。有 AMQP 时统一标准text Python程序 ──→ │ Java程序 ──→ │ AMQP协议 ──→ 任何支持AMQP的消息中间件 Go程序 ──→ │AMQP 的核心概念AMQP 概念RabbitMQ 实现作用Producer生产者代码发消息的一方Consumer消费者代码收消息的一方Exchange交换机Direct、Topic、Fanout等决定消息去哪个队列Queue队列存消息的地方Binding绑定关系把交换机和队列连接起来Virtual Host虚拟主机隔离环境类似数据库的schemaSpring AMQP 是什么Spring AMQP是 Spring 框架对 AMQP 协议的封装和增强让你用 Spring 的方式操作 RabbitMQ而不需要直接写原生的 AMQP 代码。Spring AMQP Spring 团队写的 RabbitMQ 便捷工具包Spring AMQP 的核心组件组件作用原生 AMQP 对应RabbitTemplate发送消息的工具类Channel.basicPublishRabbitListener监听队列、自动消费手写 Consumer 循环MessageConverter自动转 Java 对象为消息手动序列化RabbitAdmin自动声明队列/交换机手动调用 queueDeclarejava客户端入门案例1、创建 Spring Boot 项目在 IDEA 中File → New → Project → Spring Initializr依赖选择Spring Web可选Spring Boot DevTools可选2、pom.xml 依赖xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version2.7.14/version relativePath/ /parent groupIdcom.example/groupId artifactIdrabbitmq-springboot-demo/artifactId version1.0.0/version properties java.version11/java.version /properties dependencies !-- Spring Boot AMQP Starter包含RabbitMQ -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency !-- Spring Boot Web可选用于测试 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency !-- Lombok可选简化代码 -- dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId optionaltrue/optional /dependency !-- 测试依赖 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-test/artifactId scopetest/scope /dependency /dependencies /project3、application.yml 配置文件yaml spring: rabbitmq: # 基础连接配置 host: localhost port: 5672 username: admin password: 123456 virtual-host: / # 连接池配置可选 connection-timeout: 5000 # 连接超时毫秒 # 生产者确认机制可选 publisher-confirm-type: correlated # 开启发送确认 publisher-returns: true # 开启消息返回 # 消费者配置可选 listener: simple: acknowledge-mode: auto # 自动确认 concurrency: 3 # 最小消费者数 max-concurrency: 10 # 最大消费者数 prefetch: 1 # 每次拉取消息数 retry: enabled: true # 开启重试 max-attempts: 3 # 最大重试次数 initial-interval: 1000 # 重试间隔毫秒 # 自定义配置项目相关 rabbitmq: queue: name: hello_queue exchange: name: hello_exchange routing: key: hello_routing_key4、配置类读取YAML方式一使用 ConfigurationPropertiesjava import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; Data Component ConfigurationProperties(prefix rabbitmq) public class RabbitMQProperties { private QueueConfig queue new QueueConfig(); private ExchangeConfig exchange new ExchangeConfig(); private RoutingConfig routing new RoutingConfig(); Data public static class QueueConfig { private String name; } Data public static class ExchangeConfig { private String name; } Data public static class RoutingConfig { private String key; } }5、RabbitMQ 配置类java import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; Configuration public class RabbitMQConfig { Value(${spring.rabbitmq.host}) private String host; Value(${spring.rabbitmq.port}) private int port; Value(${spring.rabbitmq.username}) private String username; Value(${spring.rabbitmq.password}) private String password; Value(${spring.rabbitmq.virtual-host}) private String virtualHost; Value(${rabbitmq.queue.name}) private String queueName; Value(${rabbitmq.exchange.name}) private String exchangeName; Value(${rabbitmq.routing.key}) private String routingKey; /** * 创建连接工厂 */ Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory new CachingConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); // 开启发布确认 factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); factory.setPublisherReturns(true); return factory; } /** * 创建 RabbitTemplate用于发送消息 */ Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template new RabbitTemplate(connectionFactory); // 设置强制消息路由失败后返回 template.setMandatory(true); return template; } /** * 创建 RabbitAdmin用于声明队列、交换机、绑定 */ Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } /** * 声明队列 */ Bean public Queue queue() { // 参数name, durable(持久化), exclusive(独占), autoDelete(自动删除) return new Queue(queueName, true, false, false); } /** * 声明直连交换机 */ Bean public DirectExchange exchange() { return new DirectExchange(exchangeName, true, false); } /** * 绑定队列到交换机 */ Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(routingKey); } }6、生产者服务发送消息java import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; Slf4j Service RequiredArgsConstructor public class MessageProducer { private final RabbitTemplate rabbitTemplate; Value(${rabbitmq.exchange.name}) private String exchangeName; Value(${rabbitmq.routing.key}) private String routingKey; /** * 发送消息 */ public void sendMessage(String message) { // 生成唯一ID用于追踪 CorrelationData correlationId new CorrelationData(java.util.UUID.randomUUID().toString()); log.info( 准备发送消息: {}, message); // 发送消息 rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationId); // 设置发送确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - { if (ack) { log.info(✅ 消息发送成功! correlationId: {}, correlationData.getId()); } else { log.error(❌ 消息发送失败! cause: {}, cause); } }); log.info( 消息已发送到交换机: {}, 路由键: {}, exchangeName, routingKey); } /** * 发送对象消息自动转JSON */ public void sendObject(Object object) { rabbitTemplate.convertAndSend(exchangeName, routingKey, object); log.info( 对象消息已发送: {}, object); } }7、消费者服务接收消息java import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; Slf4j Service public class MessageConsumer { /** * 监听队列消费消息 */ RabbitListener(queues ${rabbitmq.queue.name}) public void receiveMessage(String message) { log.info( 收到消息: {}, message); // 处理业务逻辑 processMessage(message); } /** * 处理消息的业务逻辑 */ private void processMessage(String message) { // 模拟业务处理 log.info( 正在处理消息: {}, message); // 这里可以添加你的业务代码 } /** * 监听队列消费对象消息 */ RabbitListener(queues ${rabbitmq.queue.name}) public void receiveObject(Object object) { log.info( 收到对象消息: {}, object); } }8、测试 Controller方便测试java import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.*; RestController RequestMapping(/api/message) RequiredArgsConstructor public class MessageController { private final MessageProducer messageProducer; PostMapping(/send) public String sendMessage(RequestParam String content) { messageProducer.sendMessage(content); return 消息已发送: content; } PostMapping(/send/json) public String sendJson(RequestBody User user) { messageProducer.sendObject(user); return 用户信息已发送: user.getName(); } // 测试用实体类 static class User { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name name; } public int getAge() { return age; } public void setAge(int age) { this.age age; } Override public String toString() { return User{name name , age age }; } } }9、启动类java import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication public class RabbitMQApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQApplication.class, args); System.out.println( RabbitMQ Demo 启动成功!); System.out.println( 访问: http://localhost:8080/api/message/send?contenthello); } }以上就是简单的入门案例然后我们继续深入一点Work Queue工作队列模型Work Queue是 RabbitMQ 中一个队列多个消费者的模式消息会被平均分配给各个消费者。模型图解text ┌─────────────┐ │ Producer │ │ (生产者) │ └──────┬──────┘ │ ▼ ┌─────────────┐ │ Queue │ │ Task Queue │ │ [m1][m2][m3]│ └──────┬──────┘ │ ┌────────────┼────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │Consumer1 │ │Consumer2 │ │Consumer3 │ │ (C1) │ │ (C2) │ │ (C3) │ └──────────┘ └──────────┘ └──────────┘ 核心特点一个队列多个消费者消息默认轮询分发Round-Robin每个消息只能被一个消费者消费常见面试题问题答案Work Queue 的特点一个队列多个消费者消息只能被一个消费者消费轮询分发的问题处理快的消费者和慢的消费者工作量相同整体效率低如何解决轮询分发问题basicQos(1) 手动确认消息确认的作用保证消息不丢失处理失败可重新入队Fanout 交换机广播交换机Fanout 交换机把收到的消息广播到所有绑定的队列不管 Routing Key 是什么。核心特点特点说明广播模式消息发送给所有绑定的队列忽略 Routing KeyRouting Key 写什么都被忽略一对多一个消息 → 多个消费者同时收到业务场景用户支付成功后需要同时通知多个系统text 用户支付100元 │ ▼ 支付成功事件 ──Fanout──→ 同时发给3个系统 │ ├──→ 订单系统更新订单状态为已支付 ├──→ 积分系统给用户加100积分 └──→ 短信系统发短信通知用户Direct 交换机直连交换机核心特点精确匹配Routing Key 必须完全相等一对一或一对多多个队列可以用同一个 Routing Keytext 应用产生日志 │ ▼ Direct Exchange (log_exchange) │ ├── Routing Key ERROR ──→ 错误队列 ──→ 运维人员发钉钉告警 ├── Routing Key WARN ──→ 警告队列 ──→ 存入数据库待查 └── Routing Key INFO ──→ 信息队列 ──→ 写入文件归档业务场景一个电商系统不同级别的日志需要不同处理为什么用 Direct✅ ERROR 消息只给告警系统✅ WARN 消息只给分析系统✅ INFO 消息只给归档系统✅精确匹配不会发错Topic 交换机主题交换机核心特点模糊匹配支持通配符*和#更灵活的路由规则通配符规则通配符含义示例*匹配一个词log.*.error→ 匹配log.app.error#匹配零个或多个词log.#→ 匹配log.app.error,log.app,log.app.service.error分隔符用.分隔单词user.order.create业务场景一个外卖平台各种事件需要灵活订阅text 事件类型Routing Key 设计: ├── order.created # 订单创建 ├── order.cancelled # 订单取消 ├── order.delivered # 订单送达 ├── user.registered # 用户注册 ├── user.login # 用户登录 ├── payment.success # 支付成功 ├── payment.refund # 退款成功 ├── rider.accepted # 骑手接单 ├── rider.arrived # 骑手到店 └── rider.completed # 骑手送达不同服务的订阅规则服务订阅规则匹配到的事件短信服务order.#和user.#所有订单和用户相关事件统计分析#.created和#.cancelled所有创建和取消事件骑手APPrider.*所有骑手相关事件财务服务payment.#所有支付相关事件风控服务*.cancelled和payment.refund取消事件和退款事件