实验333 一、实验目的掌握在 IntelliJ IDEA 社区版中创建 Spring Boot 项目的方法理解Spring Boot与RabbitMQ 4.x的整合方法熟悉Spring Boot中RabbitMQ的基本配置与使用掌握消息发送、接收、确认等核心操作步骤 1创建基础项目点击 File - New - Project选择 Spring Boot点击 Next填写项目基本信息GroupId: com.exampleArtifactId: springboot-rabbitmq-demoName: springboot-rabbitmq-demoLocation: 选择项目存储路径点击 Create等待 IDEA 创建项目结构创建3个包和里面的Class类①configRabbitMQConfig②controllerRabbitMQController、WorkQueueController③serviceRabbitMQConsumer、RabbitMQService、WorkQueueConsumer、WorkQueueService步骤 2 检查依赖确保 pom.xml 包含以下依赖?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version4.0.6/version relativePath/ !-- lookup parent from repository -- /parent groupIdcom.example/groupId artifactIdspringboot-rabbitmq-demo/artifactId version0.0.1-SNAPSHOT/version namespringboot-rabbitmq-demo/name descriptionspringboot-rabbitmq-demo/description url/ licenses license/ /licenses developers developer/ /developers scm connection/ developerConnection/ tag/ url/ /scm properties java.version17/java.version /properties dependencies dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-webmvc/artifactId /dependency dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId optionaltrue/optional /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp-test/artifactId scopetest/scope /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-webmvc-test/artifactId scopetest/scope /dependency /dependencies build plugins plugin groupIdorg.springframework.boot/groupId artifactIdspring-boot-maven-plugin/artifactId configuration excludes exclude groupIdorg.projectlombok/groupId artifactIdlombok/artifactId /exclude /excludes /configuration /plugin plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId executions execution iddefault-compile/id phasecompile/phase goals goalcompile/goal /goals configuration annotationProcessorPaths path groupIdorg.projectlombok/groupId artifactIdlombok/artifactId /path /annotationProcessorPaths /configuration /execution execution iddefault-testCompile/id phasetest-compile/phase goals goaltestCompile/goal /goals configuration annotationProcessorPaths path groupIdorg.projectlombok/groupId artifactIdlombok/artifactId /path /annotationProcessorPaths /configuration /execution /executions /plugin /plugins /build /project步骤 3application.properties 配置在 src/main/resources/application.properties 修改为application.yml添加RabbitMQ连接配置spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / # 生产者消息确认 publisher-confirm-type: correlated publisher-returns: true listener: simple: # 消费者手动ACK acknowledge-mode: manual步骤 4RabbitMQ 配置类创建 src/main/java/com/example/rabbitmq/config/RabbitMQConfig.javapackage com.example.springbootrabbitmqdemo.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; Configuration public class RabbitMQConfig { // 简单模式队列 Bean public Queue simpleQueue() { return new Queue(simple.queue, true); // durabletrue 表示队列持久化 } // 工作队列模式队列 Bean public Queue workQueue() { return new Queue(work.queue, true); } // 发布/订阅模式队列 Bean public Queue fanoutQueue1() { return new Queue(fanout.queue1, true); } Bean public Queue fanoutQueue2() { return new Queue(fanout.queue2, true); } // 发布/订阅模式交换机 Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(fanout.exchange); } // 路由模式交换机 Bean public DirectExchange directExchange() { return new DirectExchange(direct.exchange); } // 绑定路由模式队列 Bean public Binding bindingDirect1() { return BindingBuilder.bind(workQueue()).to(directExchange()).with(direct.routing.key1); } Bean public Binding bindingDirect2() { return BindingBuilder.bind(workQueue()).to(directExchange()).with(direct.routing.key2); } // 通配符模式交换机 Bean public TopicExchange topicExchange() { return new TopicExchange(topic.exchange); } // 绑定通配符模式队列 Bean public Binding bindingTopic1() { return BindingBuilder.bind(workQueue()).to(topicExchange()).with(topic.#); } Bean public Binding bindingTopic2() { return BindingBuilder.bind(workQueue()).to(topicExchange()).with(topic.*.key); } }步骤 5简单模式实践1创建服务类 src/main/java/com/example/rabbitmq/service/RabbitMQService.javapackage com.example.springbootrabbitmqdemo.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; Service public class RabbitMQService { Autowired private RabbitTemplate rabbitTemplate; public void sendSimpleMessage(String message) { rabbitTemplate.convertAndSend(simple.queue, message); System.out.println(消息已发送: message); } }2创建控制器 src/main/java/com/example/rabbitmq/controller/RabbitMQController.javapackage com.example.springbootrabbitmqdemo.controller; import com.example.springbootrabbitmqdemo.service.RabbitMQService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; RestController public class RabbitMQController { Autowired private RabbitMQService rabbitMQService; GetMapping(/send/simple) public String sendSimpleMessage() { rabbitMQService.sendSimpleMessage(Hello Spring Boot RabbitMQ Simple Mode!); return 消息已发送; } }3创建消费者 src/main/java/com/example/rabbitmq/service/RabbitMQConsumer.javapackage com.example.springbootrabbitmqdemo.service; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.io.IOException; Service public class RabbitMQConsumer { private static final Logger log LoggerFactory.getLogger(RabbitMQConsumer.class); RabbitListener(queues simple.queue) public void receiveSimpleMessage(String message, Channel channel, Message amqpMessage) throws IOException { log.info(收到简单模式消息: {}, message); try { // 处理完消息后手动确认 channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 处理失败时拒绝消息重回队列 channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true); } } }步骤 6工作队列模式实践1创建工作队列服务 src/main/java/com/example/rabbitmq/service/WorkQueueService.javapackage com.example.springbootrabbitmqdemo.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; Service public class WorkQueueService { Autowired private RabbitTemplate rabbitTemplate; public void sendWorkMessage(String message) { rabbitTemplate.convertAndSend(work.queue, message); System.out.println(工作队列消息已发送: message); } }2创建工作队列控制器 src/main/java/com/example/rabbitmq/controller/WorkQueueController.javapackage com.example.springbootrabbitmqdemo.controller; import com.example.springbootrabbitmqdemo.service.WorkQueueService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; RestController public class WorkQueueController { Autowired private WorkQueueService workQueueService; GetMapping(/send/work) public String sendWorkMessage() { for (int i 0; i 10; i) { workQueueService.sendWorkMessage(Work Queue Message i); } return 工作队列消息已发送; } }3创建工作队列消费者src/main/java/com/example/rabbitmq/service/WorkQueueConsumer.javapackage com.example.springbootrabbitmqdemo.service; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.io.IOException; Service public class WorkQueueConsumer { private static final Logger log LoggerFactory.getLogger(WorkQueueConsumer.class); RabbitListener(queues work.queue, concurrency 2) public void receiveWorkMessage(String message, Channel channel, Message amqpMessage) throws IOException { log.info(收到工作队列消息: {}, message); try { Thread.sleep(1000); // 模拟处理时间 // 手动确认消息 channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 处理失败时可以选择拒绝消息重回队列或丢弃 channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true); } } }二、运行与验证步骤 1启动项目在 IDEA 中运行RabbitMQApplication 类项目启动后控制台将显示 RabbitMQ 连接信息Web 服务已经在 8080 端口正常运行接口可以正常访问项目已经成功连接到本地的RabbitMQ服务队列和交换机的声明也会自动完成没有配置错误或依赖冲突。步骤 2验证简单模式访问 http://localhost:8080/send/simple查看控制台输出确认消息已发送查看消费者控制台确认消息已被消费控制台成功打印收到简单模式消息: Hello Spring Boot RabbitMQ Simple Mode!步骤 3验证工作队列模式访问 http://localhost:8080/send/work查看控制台输出确认10条工作队列消息已发送查看两个消费者控制台确认消息被轮询消费[ntContainer#1-1] 和 [ntContainer#1-2] 是两个不同的消费线程消息 0、2、3、6、8 被线程 #1-1 处理消息 1、4、5、7、9 被线程 #1-2 处理10 条消息被两个线程轮流分摊没有一条漏处理这就是标准的轮询效果已经完全验证了工作队列的核心功能同一个队列的消息被多个消费者线程 / 实例轮询消费实现负载均衡。三、查看RabbitMQ界面访问http://localhost:15672/查看Queues and Streams1、所有队列消息全部消费完成simple.queue 和 work.queue 的 Ready、Unacked、Total 都为 0说明你发送的所有消息都已经被消费者成功接收、处理并确认了没有任何残留消息2、队列状态正常所有队列的 State 都是 running说明队列运行稳定随时可以接收新的消息3、其他队列也已准备就绪fanout.queue1、fanout.queue2、notification.queue、order.queue 都已声明完成这说明你已经提前配置好了发布订阅、路由模式需要的队列可以直接开始测试了参考资源RabbitMQ 4.x 官方教程https://rabbitmq.cn/tutorials/tutorial-one-javaSpring Boot 官方文档https://docs.spring.io/spring-boot/docs/2.7.x/reference/html/messaging.html#messaging.rabbitmqSpring AMQP 官方文档https://docs.spring.io/spring-amqp/reference/html/#overviewRabbitMQ 4.x 版本特性https://www.rabbitmq.com/releases/rabbitmq-server/v4.0.0/README.html