Electron NodeJS 订阅和消费RabbitMQ详细笔记
说明:仅记录nodejs如何消费,如何生产并未做记录,因为需求没有用到。
开发环境
操作系统:windows10/windows11
开发工具:Visual Studio Code
Electron:vue-electron 1.0.6
NodeJS:16.14.2
RabbitMQ:3.8.1
说明:我是在KubeSphere的应用商店中安装的RabbitMQ的。在哪安装不重要,只要你安装了RabbitMQ就行。关键是你要获取到RabbitMQ的端口、账号、密码就行,后面会用到。
安装amqplib
NodeJS如果要和RabbitMQ通信,amqplib是最好的选择。npm安装amqplib:
npm i amqplib
封装RabbitMQ类
对使用RabbitMQ的代码进行封装,方便使用。
以下是关于导入amqplib包的介绍:
如果你准备使用Promise的方式处理连接、创建通道,那么你需要这么导入包: var amqp = require(‘amqplib’); 否则这么导入: var amqp = require(‘amqplib/callback_api’); 我没有用Promise,所以使用的第二种方式。因为我在做断线重连的时候,使用Promise方式没整成功,所以就没用Promise了。
咱做技术的肯定要方方面面都要考虑到,才能保证系统的稳定和高可用。很多网上资料都是简单的介绍了下使用方法,断线重连都没介绍。所以基于socket的通信,断线重连和粘包都是必须要考虑的。扯远了。。。
下面直接贴代码吧:
/* eslint-disable */ var amqp = require('amqplib/callback_api') /** * 封装RabbitMQ的操作类 */ class RabbitMQ { // 构造方法 constructor(options) { if (!options) { this.rabbitmqSettings = { protocol: 'amqp', hostname: '192.168.3.101', port: 31802, username: 'admin', password: 'password' }; } this.connection = undefined; } // 接收信息 receiveQueueMsg(exchangeName, queueName, routingKey, callBack) { amqp.connect(this.rabbitmqSettings, (err, conn) => { console.log('开始创建连接'); if (err) { console.log('创建连接失败 ', err) setTimeout(() => { this.receiveQueueMsg(exchangeName, queueName, routingKey, callBack); }, 3000); return; } console.log('创建连接成功'); this.connection = conn; conn.on('error', (err) => { console.log('connect_error ' + err.message, err) setTimeout(() => { this.receiveQueueMsg(exchangeName, queueName, routingKey, callBack); }, 3000); }) conn.createChannel(function (err, channel) { console.log("创建通道", err); channel.deleteQueue(queueName); // 接收端 channel.assertQueue(queueName, { durable: true, exclusive: false, autoDelete: false }, false); channel.bindQueue(queueName, exchangeName, routingKey); // 接收端,ack表示通知RabbitMQ确认收到 channel.consume(queueName, function (msg) { callBack && callBack(msg); channel.ack(msg); }, { noAck: false }); }); }); } //关闭连接 close() { this.connection.close(); console.log("页面关闭,连接断开。"); } } export default RabbitMQ
使用RabbitMQ封装类
1.导入封装的RabbitMQ类
import RabbitMQ from ‘@/utils/rabbitmq’
2.使用封装的RabbitMQ类
export default { data () { return { mq: undefined } }, methods: { close() { this.mq.close(); }, open() { this.mq = new RabbitMQ(); this.mq.receiveQueueMsg('direct_exchange', 'queue_name_test_3', "message", (msg) => { if(msg !== null) { console.log(msg.content.toString()); } }); } } }
页面上有2个按钮,一个是打开,一个是关闭,这里就不贴代码了。
代码写的有些瑕疵啊,比如在关闭mq之前,先判断下mq是否存在,如果存在才能关闭。读者自己可以补充。
学习资料,务必要看
重要!重要!重要!
重要的事情说三遍!!!https://amqp-node.github.io/amqplib/channel_api.html这个地址非常非常的重要,务必要收藏。里面有nodejs使用rabbitmq的各种API。是百度不到的,还是俺VPN翻到的。
代码介绍
上面只是贴了代码,但没对代码做解释,不够完美,下面补上。
构造方法
RabbitMQ封装类有个构造方法:constructor,它有个参数:options。也就是说创建这个类的实例的时候,需要传入options。它是一个对象,如果没传,那么在构造方法里面重新定义这个对象。主要包含:通信协议、IP地址、端口、rabbitmq的用户名和密码。我准备后期从配置信息读取这些值。做成可配置的。
构造方法里面还有个connection字段,它是用来存放与rabbitmq连接对象的。主动关闭与rabbitmq的连接时会用到。
接收消息方法
先说需求:每个消费者都要完整消费数据。什么意思呢?比如生产者产生10条数据,每个消费者都要消费这10条。而不是将数据平均到每个消费者消费。
方法receiveQueueMsg(exchangeName, queueName, routingKey, callBack),有四个参数:
1.exchangeName:交换机名称,rabbitmq使用交换机“分发”或者“路由”生产者产生的数据到不同的通道。生产者和消费者的exchangeName保持一致,且生产时声明的exchange类型必须为:direct。
2.queueName:通道名称,每个消费者的queueName可以相同,也可以不相同。但是我的需求就要求每个queueName必须不同。否则无法完整消费数据。
3.routingKey:第1点有提到交换机“分发”生产者产生的数据到不同的通道,那么这一点rabbitmq是怎么做到的呢?就是通过routingKey绑定exchangeName和queueName的关系来实现的。绑定之后一个routingKey对应一个exchangeName和多个queueName。我们在生产和消费的时候指定routingKey就可以了。
4.callBack:这个好理解,收到rabbitmq消息后的回调方法。
amqp.connect()
用来连接rabbitmq。它的第一个参数既可以是一个url,也可以是一个对象。比如:
amqp.connect(‘amqp://admin:password@192.168.3.101:31802’, function (err, conn) {});
或者
amqp.connect(raabitmqSettings, function (err, conn) {});
conn.createChannel()
rabbitmq连接成功后,用来创建消息通道。
channel.deleteQueue(queueName)
创建通道之后,根据消息队列名称删除之前的队列,需求就这样。
假如客户端一开始是关着的,消息队列中产生了很多数据,如果不删除队列,当我打开客户端时我会获取很多垃圾数据。而最新的一条数据才是我需要的。
channel.assertQueue()
用来声明或者说是定义消息队列。
channel.assertQueue(queueName, { durable: true, exclusive: false, autoDelete: false }, false);
第一个参数queueName:表示消息队列名称。
第二个参数是个对象:durable表示队列是否持久化,注意不是消息是队列;exclusive表示是否排他,如果为true别的连接(connection)看不到它;autoDelete如果为true,当消费者数量降至零时,队列将被删除(默认为false)。
channel.bindQueue()
用来绑定交换机、消息队列、路由,前面也有提到
channel.bindQueue(queueName, exchange, routingKey);
第一个参数queueName:表示消息队列名称
第二个参数exchange:表示交换机名称
第三个参数routingKey:表示路由名称
关闭连接方法
close()
关闭连接方法处也有瑕疵,需要先判断连接对象是否存在,如果存在才允许关闭。后面读者可以自己修改完善。
断线重连
主要是在“首次连接失败”和“连接断开”时,3秒后重新发起连接。
其它
类文件顶部有/* eslint-disable */这么一行字符,目的是告诉编译器忽略eslint检查。
生产端我用的是.net core操作rabbitmq的,demo可以移步到码云:https://gitee.com/subendong/RabbitMQ.Test。这个demo里的producer的配置有几点需要注意:
1. 无需创建队列
2. 无需绑定交换机、路由、队列,由消费者绑定
3. 只需定义交换机,指定交换机类型为:direct