RabbitMQ---开篇 1.什么是MQMQ是Message Queue的缩写从字面意思来说就是一个消息队列也就是一个用来存储消息的队列消息可以非常简单比如只包含文本字符串JSON格式的数据也可以很复杂比如内嵌对象2.MQ的作用1.异步解耦我们在根据实际业务去写代码时可能这个业务中的有些操作是非常耗时的但是这些操作不追求实时的返回结果这时候就可以通过MQ异步的去实现这些操作。比如用户注册时如果用户注册成功此时就需要向用户发送短信或者邮件此时这些发送短信和邮件的操作就可以通过MQ异步的处理此时就不用让用户等待短信或者邮件发送成功后用户才会变成注册成功的状态2.流量削峰为了应对一些访问量突增的情况为了保证应用能够正常的运行就需要对这种访问量突增的情况进行防范。但是这种访问量突增的情况并不是很常见的可能就只会在一些类似于双十一的日子出现此时如果按照这种流量突增的峰值去购入其他服务器但是这样就会导致这些服务器在未来有一部分就会处于空闲状态这个是很没有性价比的。此时就可以考虑MQ中间件这种性价比高的方式来应对这种流量突增的情况可以MQ来控制流量先将请求放到MQ中然后系统根据自己的能力去逐步得去处理这些请求3.消息分发假设此时有多个系统需要对同一份的数据的变化做出反应常见的做法可能就是轮询数据库比如订单系统、积分系统等系统都需要知道“支付成功”这些系统就可能会定时去查询数据库看看有没有新的支付记录产生但是这种操作是定时的就会导致数据的实时性差且有可能给数据造成压力。此时MQ就派上用场了举个例子假设用户支付成功了此时就会生成一条支付成功的记录此时就可以将这条记录以消息的形式发送给MQ并存储在MQ中此时就可以让那些需要知道支付记录的系统自己去MQ订阅和消费对应的消息即可4.延迟通知延迟通知就是当希望某个消息不希望立即被消费者处理而是需要等待一段时间后再被消费从而实现延迟执行或者延迟通知的效果比如在订单超时未支付自动取消此时用户下单后发送一条延迟消息到MQ中30分钟后再被消费消费时检查订单是否已支付如果没有则取消订单3.RabbitMQRabbitMQ是采用Erlang语言实现的AMQP的消息中间件为了在分布式系统中存储和转发消息而设计的4.RabbitMQ的核心概念RabbitMQ是一个消息中间件,也是一个生产者消费者模型,它负责接收,存储并转发消息,下图是RabbitMQ的一个抽象图下图是RabbitMQ的具体工作流程图4.1 ProducerProducer就是生产者,是RabbitMQ的客户端,向RabbitMQ发送消息,在实际的业务中生产者发送的消息通常是一个带有业务逻辑结构的数据,比如JSON字符串,消息也可以带哟一定的标签,RabbitMQ会根据标签进行路由(就是根据标签将消息发生给对应的队列中)4.2 ConsumerConsumer就是消费者,也是RabbitMQ的客户端,从RabbitMQ中获取消息,当消费者连接到RabbitMQ的服务器,就可以消费队列中的消息了,当对应的消息被消费后,该消息的标签就会被丢弃.4.3 BrokerBroker可以看做是RabbitMQ的服务器,Broker有多个Virtual Host(虚拟机),这个虚拟机不是我们平常所说的虚拟机,可以看做类似于MySQL中的库的概念,一个虚拟机中包含多个Exchange(交换机)和Queue(队列)4.4 Virtual HostVirtual Host就是虚拟主机,它为消息队列提供了一种逻辑上的隔离机制,一个Broker中可以分割为多个Virtual Host,当有多个用户共同使用一个RabbitMQ Broker时,此时不同用户就可以在各自的Virtual Host中创建各自的Exchange和Queue来使用4.5 QueueQueue是RabbitMQ的内置队列,就是队列,不过是一个用来存储消息的队列4.6 ExchangeExchange就是交换机,当消费者发送消息给Broker时,首先Broker中对应的Virtual Host里的Exchange会先接收到消息,并根据消息的标签将消息转发给一个或者多个队列(Queue)4.7 ConnectionConnection就是建立连接,如果生产者想要向RabbitMQ发送消息或者消费者想要消费RabbitMQ里面的消息,必须要要和RabbitMQ的服务器建立连接,这个连接就是Connection,一次Connection就是一次TCP连接4.8 ChannelChannel就是通道,为了解决多个生产者同时发送消息或者多个消费者同时去消费消息而频繁建立连接,消费资源,每个Connection中可以有多个Channel,消息的发送和消费都是基于Channel的,这样就可以将多个消息的读写复用到一个TCP连接上,就可以减少建立和关闭连接带来的开销.5.RabbitMQ快速上手---生产者代码import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { // 队列名称消费者也要用同一个 private static final String QUEUE_NAME hello_queue; public static void main(String[] args) throws Exception { // 1️⃣ 创建连接工厂类似数据库连接池的配置 ConnectionFactory factory new ConnectionFactory(); // 2️⃣ 设置 RabbitMQ 服务器地址 factory.setHost(localhost); // 本地运行 factory.setPort(5672); // RabbitMQ 默认端口 factory.setUsername(guest); // 默认用户名 factory.setPassword(guest); // 默认密码 // 3️⃣ 建立 TCP 连接真实网络连接 Connection connection factory.newConnection(); // 4️⃣ 创建一个通信通道Channel // ⚠️ RabbitMQ 几乎所有操作都在 Channel 上进行 Channel channel connection.createChannel(); // 5️⃣ 声明一个队列如果不存在就创建 /* 参数说明 queue – 队列名 durable – 是否持久化false重启后消失 exclusive – 是否排他仅当前连接可用 autoDelete – 是否自动删除没有消费者时删除 arguments – 额外参数一般 null */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 6️⃣ 要发送的消息 String message Hello RabbitMQ!; // 7️⃣ 发送消息 /* exchange – 交换机 表示默认交换机 routingKey – 路由键这里直接用队列名 props – 消息属性比如持久化、优先级 body – 消息内容必须是字节数组 */ channel.basicPublish( , // 默认交换机 QUEUE_NAME, // 队列名 null, // 无额外属性 message.getBytes() // 消息转成字节 ); // 8️⃣ 打印日志 System.out.println( [x] Sent message ); // 9️⃣ 关闭资源生产环境建议用 try-with-resources channel.close(); connection.close(); } }6.RabbitMQ快速上手---消费者代码import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { // 队列名必须和生产者一致 private static final String QUEUE_NAME hello_queue; public static void main(String[] args) throws Exception { // 1️⃣ 创建连接工厂 ConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost); factory.setPort(5672); factory.setUsername(guest); factory.setPassword(guest); // 2️⃣ 创建连接 Connection connection factory.newConnection(); // 3️⃣ 创建通道 Channel channel connection.createChannel(); // 4️⃣ 声明队列防止队列不存在 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println( [*] Waiting for messages...); // 5️⃣ 创建 DeliverCallback不用 lambda DeliverCallback deliverCallback new DeliverCallback() { Override public void handle(String consumerTag, Delivery delivery) throws IOException { // 把消息转成字符串 String message new String(delivery.getBody(), UTF-8); System.out.println( [x] Received: message); } }; // 6️⃣ 创建 CancelCallback不用 lambda CancelCallback cancelCallback new CancelCallback() { Override public void handle(String consumerTag) { System.out.println( [x] Consumer cancelled: consumerTag); } }; // 7️⃣ 开始消费消息 channel.basicConsume( QUEUE_NAME, // 队列名 true, // 自动确认 deliverCallback, cancelCallback ); } }