昨晚12点,女朋友突然问我:你会RabbitMQ吗?我竟然愣住了。
01为什么要用消息队列?
1.1 同步调用和异步调用
在说起消息队列之前,必须要先说一下同步调用和异步调用。
同步调用:A服务去调用B服务,需要一直等着B服务,直到B服务执行完毕并把执行结果返回给A之后,A才能继续往下执行。
举个例子:过年回到家,老妈对你说:“你也不小了,该谈女朋友了,隔壁王阿姨给你……。”“妈!我谈的有!”
老妈嘴角微微上扬:“那她现在有空吗?让妈给你把把关。”
你被逼之下跟女朋友开视频说:“那个我妈在我旁边,她想跟你说说话。”
你女朋友一下子慌了,立马拿起眉笔、口红、遮瑕对你说:“你先别挂,等我2分钟,我稍微化一下妆。”
你就一直等着她,等她化好妆之后你把手机给了你老妈。所以同步调用的核心就是:等待。
异步调用:A服务去调用B服务,不用一直等待B服务的执行结果,也就是说在B服务执行的同时A服务可以接着执行下面的程序。
举个例子:上午10点钟,办公室里,正在上班的你给你女朋友发微信说:“亲爱的,等你不忙了给我发一张你的照片吧,我想你了。”然后你接着工作了。
等到下午2点你女朋友给你发了一张她的美颜照,你点开看了看,迷的颠三倒四。所以异步调用的核心就是:只用通知对方一下,不用等待,通知完我这边该干嘛干嘛!
上面所说的异步调用就是用消息队列去实现。
1.2 为什么要用消息队列?
场景一:用户注册
现在很多网站都需要给注册的用户发送注册短信或者激活邮箱,如果使用同步调用的话用户只有注册成功后才能给用户发送短信和邮箱链接,这样花费的时间就会很长。
有了消息队列之后我们只需要将用户注册的信息写入到消息队列里面,接来下该干嘛干嘛。
发送邮箱和发送短信的服务随时从消息队列里面取出该用户的信息,然后再去发送短信和邮箱链接。这样花费的时间就会大大减少。
场景二:修改商品
在微服务项目中,有时候数据量太多的话就需要分库分表,例如下图中商品表分别存储在A数据库和B数据库中。
有一天我们去调用修改商品的服务去修改A数据库中的商品信息,由于我们还需要调用搜索商品的服务查询商品信息,所以修改完A库中的商品信息后必须保证B库中的商品信息和A库一样。
如果采用同步调用的方式,在修改完A库的商品信息之后需要等待B库的商品信息修改完,这样耗时过长。
有了消息队列之后我们修改完A库的商品信息之后只需要将要修改的商品信息写入消息队列中,接下来该干什么干什么。
搜索商品的服务从消息队列中读取要修改的商品信息,然后同步B库中的商品信息,这样就大大地缩短响应时间。
02 RabbitMQ介绍
2.1 什么是MQ
MQ(Message Quene) : 江湖人称消息队列,小名又叫消息中间件。消息队列基于生产者和消费者模型,生产者不断向消息队列中发送消息,消费者不断从队列中获取消息。
因为消息的生产和消费都是异步的,而且没有业务逻辑的侵入,所以可以轻松的实现系统间解耦。
2.2 MQ有哪些
当今市面上有很多消息中间件,ActiveMQ、RabbitMQ、Kafka以及阿里巴巴自研的消息中间件RocketMQ等。
2.3 不同MQ特点
-
RabbitMQ 稳定可靠,支持多协议,有消息确认,基于erlang语言。
-
Kafka高吞吐,高性能,快速持久化,无消息确认,无消息遗漏,可能会有有重复消息,依赖于zookeeper,成本高。
-
ActiveMQ不够灵活轻巧,对队列较多情况支持不好。
-
RocketMQ性能好,高吞吐,高可用性,支持大规模分布式,协议支持单一。
2.4 RabbitMQ
基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
AMQP:即Advanced Message Queuing Protocol, 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
RabbitMQ主要特性:
-
保证可靠性:使用一些机制来保证可靠性,如持久化、传输确认、发布确认
-
可伸缩性:支持消息集群,多台RabbitMQ服务器可以组成一个集群
-
高可用性:RabbitMQ集群中的某个节点出现问题时队列任然可用
-
支持多种协议
-
支持多语言客户端
-
提供良好的管理界面
-
提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因
-
提供插件机制:可通过插件进行多方面扩展
03 RabbitMQ安装及配置
3.1 docker安装RabbitMQ
3.1.1 获取RabbitMQ镜像
指定版本,该版本包含了RabbitMQ的后台图形化页面
docker pull rabbitmq:management
3.1.2 运行RabbitMQ镜像
方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
3.2 本地安装RabbitMQ
3.2.1 因为RabbitMQ是用erlang语言开发的,所以安装之前先删除erlang包
yum remove erlang*
3.2.2 将RabbitMQ安装包上传到linux服务器上
erlang-23.2.1-1.el7.x86_64.rpm
rabbitmq-server-3.8.9-1.el7.noarch.rpm
3.2.3 安装Erlang依赖包
rpm -ivh erlang-23.2.1-1.el7.x86_64.rpm
3.2.4 安装RabbitMQ安装包(需要联网)
yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm
注意:安装完成后配置文件在:/usr/share/doc/rabbitmq-server-3.8.9/rabbitmq.config.example目录中,需要 将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config
3.2.5 复制配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
3.2.6 查看配置文件
ls /etc/rabbitmq/rabbitmq.config
3.2.7 修改配置文件
vim /etc/rabbitmq/rabbitmq.config
将上图中框着的部分修改为下图:
3.2.8 启动rabbitmq中的插件管理
rabbitmq-plugins enable rabbitmq_management
3.2.9 查看服务状态
systemctl status rabbitmq-server
rabbitmq常用命令
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server
3.2.10 如果是买的服务器,记得安全组开放15672和5672端口
3.2.11 访问RabbitMQ的后台图形化管理界面
- 浏览器地址栏输入:http://ip:15672
- 登录管理界面
username:guest
password:guest
3.3 Admin用户和虚拟主机管理
3.3.1 添加用户
上面的Tags选项,其实是指定用户的角色。超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
3.3.2 创建虚拟主机
虚拟主机:为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。
其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。
3.3.3 绑定虚拟主机和用户
创建好虚拟主机,我们还要给用户添加访问权限。点击添加好的虚拟主机,进入虚拟机设置界面。
04 RabbitMQ的4种消息模式
4.1 简单模式
说白了就是一个生产者发送消息,一个消费者接受消息,一对一的关系。
在上图的模型中,有以下概念:
producer:生产者,消息发送者
consumer:消费者:消息的接受者
queue:消息队列,图中红色部分。类似一个仓库,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
4.2 工作模式
说白了就是一个生产者发送消息,多个消费者接受消息。只要其中的一个消费者抢先接收到了消息,其他的就接收不到了。一对多的关系。
4.3 广播模式
这里引入了交换机(Exchange)的概念,交换机绑定所有的队列。也就是说消息生产者会先把消息发送给交换机,然后交换机把消息发送到与它绑定的所有队列里面,消费者从它所绑定的队列里面获取消息。
在广播模式下,消息发送流程是这样的:
-
可以有多个消费者
-
每个消费者有自己的queue(队列)
-
每个队列都要绑定到Exchange(交换机)
-
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
-
交换机把消息发送给绑定过的所有队列
-
队列的消费者都能拿到消息。实现一条消息被多个消费者消费
4.4 路由模式
4.4.1 Routing之订阅模型-Direct(直连)
举个例子:消息生产者发送消息时给了交换机一个红桃A,消息生产者对交换机说:”这条消息只能给有红桃A的队列“。交换机发现队列一手里是黑桃K,队列二手里是红桃A,所以它将这条消息给了队列二。
在路由-直连模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
-
消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey。
-
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
4.4.2 Routing 之订阅模型-Topic
举个例子:消息生产者发送消息时给了交换机一个暗号:hello.mq,消息生产者对交换机说:”这条消息只能给暗号以hello开头的队列“。交换机发现它与队列一的暗号是hello.java,与队列二的暗号是news.today,所以它将这条消息给了队列一。
Topic类型的交换机与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如:b.hello
05 Maven 应用整合 RabbitMQ
5.1 创建 SpringBoot 项目,引入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
</dependencies>
5.2 创建 RabbitMQ 的连接参数工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//ip地址
factory.setHost("##.##.##.##");
//端口
factory.setPort(5672);
//虚拟主机
factory.setVirtualHost("myhost");
//账户
factory.setUsername("root");
//密码
factory.setPassword("########");
Connection connection = factory.newConnection();
return connection;
}
}
5.3 第一种:简单模式
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue", false, false, false, null);
// 写到队列中的消息内容
String message = "你好啊,mq!";
// 参数1 交换机,此处没有
// 参数2 发送到哪个队列
// 参数3 属性
// 参数4 内容
channel.basicPublish("", "queue", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消息消费者
public class Consumer {
public static void main(String[] args) throws Exception {
//获取RabbitMq的连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("简单模式获取消息:"+new String(body));
}
});
}
}
测试结果:
5.4 第二种:工作模式
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue", false, false, false, null);
// 写到队列中的消息内容
String message = "你好啊,mq";
// 参数1 交换机,此处无
// 参数2 发送到哪个队列
// 参数3 属性
// 参数4 内容
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "queue", null, (message+i).getBytes());
}
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者01
public class ConsumerOne {
public static void main(String[] args) throws Exception {
//创建一个RabbitMq的连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者01:"+new String(body));
}
});
}
}
消费者02
public class ConsumerTwo {
public static void main(String[] args) throws Exception {
//创建一个RabbitMq的连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者02:"+new String(body));
}
});
}
}
测试结果:
消费者01
消费者02
5.5 第三种:广播模式
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue01", false, false, false, null);
channel.queueDeclare("queue02", false, false, false, null);
//创建交换机,如果存在就不创建。并指定交换机的类型是FANOUT即广播模式
channel.exchangeDeclare("fanout-exchange", BuiltinExchangeType.FANOUT);
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里不指定key
channel.queueBind("queue01", "fanout-exchange", "");
channel.queueBind("queue02", "fanout-exchange", "");
// 消息内容
String message = "这是一条广播消息";
// 参数1 交换机
// 参数2 发送到哪个队列,因为指定了交换机,所以这里队列名为空
// 参数3 属性
// 参数4 内容
channel.basicPublish("fanout-exchange", "", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者01
public class ConsumerOne {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMq连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue01",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者01:"+new String(body));
}
});
}
}
消费者02
public class ConsumerTwo {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMq连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue02",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者02:"+new String(body));
}
});
}
}
测试结果
5.6 第四种 路由模式
1)路由模式之Direct(直连)
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue03", false, false, false, null);
channel.queueDeclare("queue04", false, false, false, null);
//创建交换机,如果存在就不创建。并指定交换机的类型是DIRECT模式
channel.exchangeDeclare("direct-exchange", BuiltinExchangeType.DIRECT);
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里指定路由key是a
channel.queueBind("queue03", "direct-exchange", "a");
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里指定路由key是b
channel.queueBind("queue04", "direct-exchange", "b");
//消息
String message = "这是一条key为a的消息";
// 参数1 交换机
// 参数2 路由key
// 参数3 属性
// 参数4 内容
channel.basicPublish("direct-exchange", "a", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者03
public class ConsumerThree {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMQ连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue03",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者03:"+new String(body));
}
});
}
}
消费者04
public class ConsumerFour {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMQ连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue04",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者04:"+new String(body));
}
});
}
}
测试结果
只有消费者03收到了消息
2)路由模式之-Topic
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue05", false, false, false, null);
channel.queueDeclare("queue06", false, false, false, null);
//创建交换机,如果存在就不创建。并指定交换机的类型是TOPIC模式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里指定路由key是a.*
//*是通配符,意思只要key满足a开头,.后面是什么都可以
channel.queueBind("queue05", "topic-exchange", "a.*");
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里指定路由key是b.*
//*是通配符,意思只要key满足b开头,.后面是什么都可以
channel.queueBind("queue06", "topic-exchange", "b.*");
// channel.queueDeclare("queue", false, false, false, null);
// 消息内容
String message = "这是一条key为a.hello的消息";
// 参数1 交换机,此处无
// 参数2 路由key
// 参数3 属性
// 参数4 内容
channel.basicPublish("topic-exchange", "a.hello", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消息消费者05
public class ConsumerFive {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMQ连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue05",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者05:"+new String(body));
}
});
}
}
消息消费者06
public class ConsumerSix {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMQ连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue06",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者06:"+new String(body));
}
});
}
}
测试结果
06 SpringBoot 整合 RabbitMQ
6.1 创建 SpringBoot 项目,引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
6.2 配置配置文件
spring:
application:
name: mq-springboot
rabbitmq:
host: ##.##.##.##
port: 5672
username: root
password: #####
virtual-host: myhost
6.3 第一种:简单模式
消息生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("quenue","你好mq");
}
消息消费者
@Component
public class SingleCunstomer {
//监听的队列
@RabbitListener(queues = "queue")
public void receive(String message){
System.out.println("消息:" + message);
}
}
6.4 第二种:工作模式
消息生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("quenue","你好mq!");
}
}
消息消费者
@Component
public class WorkCunstomer {
@RabbitListener(queues = "queue")
public void customerOne(String message){
System.out.println("消费者一:" + message);
}
@RabbitListener(queues = "queue")
public void customerTwo(String message){
System.out.println("消费者二:" + message);
}
}
6.5 第三种:广播模式
消息生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
//参数1 交换机 参数2 路由key 参数三 消息
rabbitTemplate.convertAndSend("fanout-exchange","","这是一条广播消息");
}
消息消费者
@Component
public class FanoutCunstomer {
@RabbitListener(queues = "queue01")
public void customerOne(String message){
System.out.println("消费者一:" + message);
}
@RabbitListener(queues = "queue02")
public void customerTwo(String message){
System.out.println("消费者二:" + message);
}
}
6.6 第4种:路由模式
1)Direct(直连)模式
消息生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
//参数1 交换机 参数2 路由key 参数三 消息
rabbitTemplate.convertAndSend("direct-exchange","a","这是一条广播消息");
}
消息消费者
@Component
public class DirectCunstomer {
//监听的队列 queue03
@RabbitListener(queues = "queue03")
//监听的队列 queue04
public void customerOne(String message){
System.out.println("消费者一:" + message);
}
@RabbitListener(queues = "queue04")
public void customerTwo(String message){
System.out.println("消费者二:" + message);
}
}
2)Topic模式
消息生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
//参数1 交换机 参数2 路由key 参数三 消息
rabbitTemplate.convertAndSend("topic-exchange","a.hello","这是一条广播消息");
}
消息消费者
@Component
public class TopicCunstomer {
//监听的队列 queue05
@RabbitListener(queues = "queue05")
public void customerOne(String message){
System.out.println("消费者一:" + message);
}
//监听的队列 queue06
@RabbitListener(queues = "queue06")
public void customerTwo(String message){
System.out.println("消费者二:" + message);
}
}
6.7 SpringBoot 应用中通过配置完成队列的创建
@Configuration
public class RabbitMQConfiguration {
//创建队列
@Bean
public Queue queue1(){
Queue queue9 = new Queue("queue1");
return queue9;
}
@Bean
public Queue queue2(){
Queue queue2 = new Queue("queue2");
//设置队列属性
return queue2;
}
//创建广播模式交换机
@Bean
public FanoutExchange ex1(){
return new FanoutExchange("ex1");
}
//创建路由模式-direct交换机
@Bean
public DirectExchange ex2(){
return new DirectExchange("ex2");
}
//绑定队列
@Bean
public Binding bindingQueue1(Queue queue1, DirectExchange ex2){
return BindingBuilder.bind(queue1).to(ex2).with("a1");
}
@Bean
public Binding bindingQueue2(Queue queue2, DirectExchange ex2){
return BindingBuilder.bind(queue2).to(ex2).with("a2");
}
}
6.8 使用RabbitMQ发送-接收对象
消息生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void sendMsg() {
User user = new User();
user.setId(1).setAge(16).setUsername("张飞");
rabbitTemplate.convertAndSend("queue",user);
}
消息消费者
public class SingleCunstomer {
//监听的队列
@RabbitListener(queues = "queue")
public void receive(User user){
System.out.println("对象:" + user);
}
}
07 RabbitMQ 消息确认机制
所谓消息确认机制就是消息生产者有没有将消息发出去?生产者有没有将消息发给交换机,交换机有没有将消息发到队列里面?消息消费者是否成功的从队列里面获取到了消息?
就像你在网上买东西,商家有没有将快递发到你家小区楼下的快递驿站?你有没有成功的从快递驿站拿到你的快递?
所以RabbitMQ的消息确认机制包括消息发送端的确认机制和消息消费端的确认机制。
消息发送端:
- confirm机制:消息生产者是否成功的将消息发送到交换机。
- return机制:交换机是否成功的将消息发送到队列。
消息消费端:消息消费者是否成功的从队列获取到了消息。
7.1 SpringBoot配置消息确认
消息发送端消息确认配置
# 消息发送到交换器确认
spring.rabbitmq.publisher-confirm-type=correlated
# 消息发送到队列确认
spring.rabbitmq.publisher-returns=true
7.2 消息发送到交换机监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
//消息发送到交换机监听类
public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息成功发送到交换机! correlationData:{}", correlationData);
} else {
log.info("消息发送到交换机失败! correlationData:{}", correlationData);
}
}
}
7.3 消息未路由到队列监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
//消息未路由到队列监听类
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("Fail... message:{},从交换机exchange:{},以路由键routingKey:{}," + "未找到匹配队列,replyCode:{},replyText:{}",
message, exchange, routingKey, replyCode, replyText);
}
}
7.4 重新注入RabbitTemplate,并设置两个监听类
@Configuration
public class RabbitMQConfig {
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
rabbitTemplate.setReturnCallback(new SendReturnCallback());
return rabbitTemplate;
}
}
7.5 消费端确认
添加配置
# 消费者消息确认--手动 ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费者代码
@Component
@RabbitListener(queues = RabbitMQConfig.TASK_QUEUE_NAME)
public class Receiver {
@RabbitHandler
public void process(String content, Channel channel, Message message) {
try {
// 业务处理成功后调用,消息会被确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 业务处理失败后调用
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
08 RabbitMQ 死信队列实现消息延迟
8.1 什么是延迟队列
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
8.2 RabbitMQ如何实现延迟队列?
AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过TTL(Time To Live)特性模拟出延迟队列的功能。
8.3 消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间.
8.4 实现延迟队列
延迟任务通过消息的TTL来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。
场景:使用延迟队列实现订单支付监控
8.5 代码实现
RabbitMQConfig
@Configuration
public class RabbitMQConfig {
//交换机
public static final String EXCHANGE = "delay.exchange";
//死信队列
public static final String DELAY_QUEUE = "delay.queue";
//死信队列与交换机绑定的路由key
public static final String DELAY_ROUTING_KEY = "delay.key";
//业务队列
public static final String TASK_QUEUE_NAME = "task.queue";
//业务队列与交换机绑定的路由key
public static final String TASK_ROUTING_KEY = "task.key";
// 声明交换机
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE);
}
// 声明死信队列
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(2);
//死信队列消息过期之后要转发的交换机
args.put("x-dead-letter-exchange", EXCHANGE);
//消息过期转发的交换机对应的key
args.put("x-dead-letter-routing-key", TASK_ROUTING_KEY);
return new Queue(DELAY_QUEUE, true, false, false, args);
}
// 声明死信队列绑定关系
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(delayQueue()).to(exchange()).with(DELAY_ROUTING_KEY);
}
// 声明业务队列
@Bean
public Queue taskQueue() {
return new Queue(TASK_QUEUE_NAME, true);
}
//声明业务队列绑定关系
@Bean
public Binding taskBinding() {
return BindingBuilder.bind(taskQueue()).to(exchange()).with(TASK_ROUTING_KEY);
}
}
消息生产者
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
//orderId 是订单id interval是自定义过期时间 单位:秒
public void orderDelay(String orderId,Long interval) {
MessageProperties messageProperties = new MessageProperties();
//设置消息过期时间
messageProperties.setExpiration(String.valueOf(interval));
Message message = new Message(orderId.getBytes(), messageProperties);
//生产者将消息发给死信队列,并设置消息过期时间
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, null, message);
}
}
消息消费者
@Component
public class Consumer {
@Autowired
private OrderService orderService;
//监听业务队列
@RabbitListener(queues = RabbitMQConfig.TASK_QUEUE_NAME)
public void receiveTask(Message message){
String orderId = new String(message.getBody());
log.info("过期的任务Id:{}", orderId);
Order order = orderService.getById(orderId);
//如果订单支付状态仍为未支付
if(order.getPayState()==0){
//设置该订单状态为已关闭
order.setPayState(2);
orderService.updateById(order);
}
}
}
09 RabbitMQ 的应用场景
9.1 解耦
场景说明:用户下单之后,订单系统要通知库存系统修改商品数量
9.2 异步
场景说明:用户注册成功之后,需要发送注册邮件及注册短信提醒
9.3 消息通信
场景说明:应用系统之间的通信,例如聊天室
9.4 流量削峰
场景说明:秒杀业务。大量的请求不会主动请求秒杀业务,而是存放在消息队列。
微信公众号:eclipse编程。专注于编程技术分享,坚持终身学习。