消息队列-一篇读懂rabbitmq(生命周期,confirm模式,延迟队列,集群)
什么是消息队列?
就是生产者生产一条消息,发送到这个rabbitmq,消费者连接rabbitmq并且进行消费,生产者和消费者并需要知道对方是如何工作的,从而实现程序之间的解耦,异步和削峰,这也就是消息队列的作用。
使用的场景也有很多,比如用户支付购买之后的发送短信,增加用户积分等等,只要能将业务逻辑抽象出来,就能很好得使用它。
下面进入正题:
先来介绍一下基本概念和参与生命周期的各个成员。
publisher:消息生产者,负责创建消息,并发送到代理服务器(rabbitmq)
message:发送的消息,由 有效负载(payload) 和 标签 (label) 组成
exchange:交换器,负责接收消息并路由给服务器的队列
queue:消息队列,就是消息最后要去的地方。然后等待消费者取走并消费
consumer:消息消费者,与生产者对应,程序的另外一方,负责消费信息,并完成相应的业务逻辑
channel:信道,在tcp之上建立的通道,负责传送消息。队列的传输都是基于信道来完成的。
broker:消息队列服务器实体
下面来解析一下这张图,这张图是网上找的,虽然不够详细,但是勉强能用。
准备前提,开启rabbitmq服务,生命队列和交换器,并将两者进行绑定。
生产逻辑:
publisher 通过 broker服务器ip 和 端口 尝试建立与 broker 的 tcp 连接,连接成功之后,会尝试验证验证用户名和密码,如果错误,则拒绝访问。
验证成功之后,在tcp上建立一个 信道(channel) ,publisher 通过信道,发送消息到 broker,进入指定的虚拟主机virtual host。然后查找到交换器绑定的队列,并将消息推送进入队列。
消费逻辑:
consumer 建立连接和验证和生产者一样。接下来通过队列名,直接找到队列进行监听并消费。
有几个比较重要的知识点:
1. rabbitmq的交换器并不是真正意义的交换器,它本质上其实就是一张表,里面存放和交换器名称和消息队列的映射关系。所以说队列的传输都是通过信道来完成的。
2. 信道存在的意义在于 创建和销毁tcp连接非常消耗资源
交换器的类型
当生产的消息进入虚拟主机时,会去寻找一张表,就是交换器和队列映射关系的一张表。而交换器的类型也分了好多种,可以根据不同的场景自由选择。
目前总共分了4种,direct,fanout,topic,headers。其中headers因为性能问题几乎不在使用,这里就不做过多的讨论。
1.direct
direct是直接,完全匹配,单播的模式。
php简单代码实现:
- //创建队列
- $q = new AMQPQueue($channel);
- $q->setName($q_name);
- $q->setFlags(AMQP_DURABLE); //持久化
- $q->declare();
- //创建交换机对象
- $ex = new AMQPExchange($channel);
- $ex->setName($e_name);
- $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
- $ex->setFlags(AMQP_DURABLE); //持久化
- $ex->declare();
- //绑定交换机与队列,并指定路由键
- $q->bind($e_name, $k_route);
- //生产消息
$ex->publish($message, $k_route)
特性: 当生产消息时,未指定交换器,则会默认使用 (AMQP default) 交换器,然后路由到 和路由键名称相同的队列中去
2.fanout
如果说 direct 是 单对单 的关系,那么 fanout就是单对多的关系,即一个交换器对应多个队列。
fanout交换器不通过路由键路由到队列,而是通过将队列绑定在交换器上,当消息进来时,直接路由到该交换器绑定的队列去。
3.topic
topic交换器通过路由键,将自动匹配允许匹配的队列,相比fanout,更加灵活,不过对架构要求更高。如下图所示
- $e_name = 'logs-exchange'; //交换机名
- $q_name = 'msg-inbox-errors'; //队列名
- //创建队列
- $q = new AMQPQueue($channel);
- $q->setName($q_name);
- $q->setFlags(AMQP_DURABLE); //持久化
- $q->declare();
- //创建交换机对象
- $ex = new AMQPExchange($channel);
- $ex->setName($e_name);
- $ex->setType(AMQP_EX_TYPE_TOPIC); //direct类型
- $ex->setFlags(AMQP_DURABLE); //持久化
- $ex->declare();
- $q->bind($e_name, '*.msg-inbox');
- $ex->publish($message, 'wonima.msg-inbox');
高级特性:
为了确保消息可靠性,有两种处理方式.
1.rabbitmq事务
事务主要是对信道进行设置,示例代码如下
- $channel->startTransaction(); //开始事务
- for($i=0; $i<5; ++$i){
- $message = "TEST MESSAGE! 测试消息!";
- $message = $message.$i."---";
- echo "Send Message:".$ex->publish($message, 'xxxx')."\n";
- }
- $channel->commitTransaction(); //提交事务
经测验,使用事务之后,性能会造成相当大的影响,与不实用事务相比,性能可以相差百倍以上。
2.confirm 模式
当信道设置未 confirm 模式的时候,每一条消息都会获的唯一的id。当消费者接收到消息的时候,自动发送 或 手动发送消息 进行消息确认。
- //创建队列
- $q = new AMQPQueue($channel);
- $q->setName($q_name);
- $q->setFlags(AMQP_DURABLE); //持久化
- echo "Message Total:".$q->declare()."\n";
- //第一种:自动应答
- //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
//第二种:手动应答- $q->consume('processMessage');
- /**
- * 消费回调函数
- * 处理消息
- */
- function processMessage($envelope, $queue) {
- $msg = $envelope->getBody();
- sleep(2);
- $myfile = fopen("newfile2.txt", "a+") or die("Unable to open file!");
- $txt = $msg.time()."\n";
- fwrite($myfile, $txt);
- fclose($myfile);
- echo $msg.time()."\n"; //处理消息
- $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答
- }
应答模式最大的好处是就是异步,执行效率高。事务和应答模式相比,后者使用更加频繁,前者几乎没有见到过。
延迟队列
首先声明rabbitmq是不支持延迟队列的,但是我们可以利用死信队列来完成。
实现延迟队列也有多种方式:
第一种:设置死信队列,并将 过期时间 加到队列里面
- try {
- $conn = new AMQPConnection($connectConfig);
- $conn->connect();
- if (!$conn->isConnected()) {
- echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
- exit();
- }
- $channel = new AMQPChannel($conn);
- if (!$channel->isConnected()) {
- echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
- exit();
- }
- $exchange = new AMQPExchange($channel);
- $exchange->setFlags(AMQP_DURABLE);//持久化
- $exchange->setName($params['exchangeName'] ?: '');
- $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
- $exchange->declareExchange();
- $queue = new AMQPQueue($channel);
- $queue->setName($params['queueName'] ?: '');
- $queue->setFlags(AMQP_DURABLE);
- $queue->setArguments(array(
- 'x-dead-letter-exchange' => 'last_exchange',
- 'x-dead-letter-routing-key' => 'last_route',
- 'x-message-ttl' => 10000,
- ));
- $queue->declareQueue();
- //绑定
- $queue->bind($params['exchangeName'], $params['routeKey']);
- $exchange2 = new AMQPExchange($channel);
- $exchange2->setFlags(AMQP_DURABLE);//持久化
- $exchange2->setName('last_exchange');
- $exchange2->setType(AMQP_EX_TYPE_DIRECT); //direct类型
- $exchange2->declareExchange();
- $queue2 = new AMQPQueue($channel);
- $queue2->setName('last_queue');
- $queue2->setFlags(AMQP_DURABLE);
- $queue2->declareQueue();
- $queue2->bind('last_exchange', 'last_queue');
- } catch (Exception $e) {
- }
- $time = time();
- //生成消息
- $exchange->publish((string)$time, $params['routeKey'], AMQP_MANDATORY, [
- 'delivery_mode' => 2,
- ]);
第二种:设置死信队列,并将 过期时间 加到消息里面,这一种更加自由。
- $msg = [
- 'x-message-ttl' => 5,
- 'ttl' => 5,
- 'body' => time()
- ];
- $msg = json_encode($msg);
- $exchange->publish($msg, '', AMQP_MANDATORY, ['delivery_mode' => 2]);
第三种:使用延迟插件
集群
先来谈谈rabbitmq的集群是如何运行的
当你开启来两个rabbitmq(节点)服务,并将其组成为一个集群。每个节点并不会将所有的队列进行拷贝,元数据依旧保存在单个节点当中,其他节点则是通过指针。
举个例子:节点a和节点b组成了一个集群,节点a保存着一堆元数据 c 和 元数据d的指针,用来指向节点b,节点b保存一堆元数据d 和 元数据 c的指针,用来指向节点a。
这样做有两个原因
1 存储空间 :如果一个节点存储了1gb的数据,再添加节点,只会带来一摸一样的1gb的数据,非常浪费磁盘空间
2 性能:对于持久化消息来说,每一条消息都会触发磁盘io,每次新增节点,网路和磁盘负载都会增加,相对于单机来说,性能不但不会提升,反而可能下降。
但是由于交换器只是一张查询表,并非实际的路由器,因此将交换器在整个集群进行复制也不会损耗太多的性能,所以交换器在每个节点都会保存一份,以便于查询。