AY C# RabbitMQ 2019 微笔记
自己写的.NET和RabbitMQ的文章,希望能帮到大家
参考报告:http://www.rabbitmq.com/dotnet-api-guide.html
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统
AMQP 一个虚拟主机持有一组交换机、队列和绑定
virtual host,虚拟主机
exchange,交换机
queue,队列
binding,绑定
安装
RabbitMQ下载:http://www.rabbitmq.com/download.html
我的版本是3.7.9
安装OTP后,安装兔子
我怕不必要麻烦,把路径的空格都去掉了,直接RS
任务管理器查看服务
查看端口 DOS
netstat -ano
根据端口号查询 PID
netstat -aon|findstr “5672”
这是兔子的默认监听端口5672
根据PID 查占用的端口号
tasklist|findstr “988”
打开任务管理器
结束进程,也可以任务管理器结束,也可以命令
taskkill /f /t /im erl.exe
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
通过web管理兔子
cd D:\RS\rabbitmq_server-3.7.9\sbin
d:
rabbitmq-plugins enable rabbitmq_management
重启服务
net stop RabbitMQ
net start RabbitMQ
打开浏览器 http://localhost:15672/ 账号guest 密码guest
我以前写的基本使用:http://www.ayjs.net/post/363.html
这次命令行创建,先建立一个环境变量
点击确定后,打开命令行,我设置一个字体和颜色和背景了 (* ̄︶ ̄)
查询服务状态
rabbitmqctl status
列举虚拟主机列表
rabbitmqctl list_vhosts
列举用户列表
rabbitmqctl list_users
添加用户和密码
rabbitmqctl add_user ay 123456
设置权限
rabbitmqctl set_permissions ay “.*” “.*” “.*”
分配用户组
rabbitmqctl set_user_tags ay administrator
删除guest用户
rabbitmqctl delete_user guest
刷新网页
修改用户密码 【自己可以修改账户密码】
rabbitmqctl change_password {username} {newpassowrd}
换自己的账号登录
发送消息,生产者 接收消息 消费者 RabbitMQ是Erlang语言开发
队列 Queue 先进先出
prefetchCount限制每次发送给消费者的消息个数,等于1,消息者一个一个处理,在第三篇会细讲,让你清楚为什么有这个属性
ExInclusive 只能被一个连接使用,连接关闭后,消息删除
auto deleted: 当最后一个使用者取消订阅时,删除 大于一个使用者 的队列
Arguments 可选的; 由插件和特定于代理的功能使用,例如消息TTL,队列长度限制等
Priorities 优先级,如果需要优先级队列,我们建议使用1到10之间。目前使用更多优先级将消耗更多资源(Erlang进程)。
一个消息一个 消费者,消费者处理完了,是手动告诉兔子 我处理完了,还是手动告诉兔子,兔子才会把消息删掉。
交换器,我称呼邮箱 Exchange 类似微信的公众号,你关注了Exchange,往这里发消息,订阅公众号的都能收到消息
一个消息 很多消费者,共享这个消息的。
邮箱有类型的,
fanout 广播
model.ExchangeDeclare(ExchangeName, “fanout”);
先连接的消费者都会即时收到消息,后来启动的消费者打开是不会收到之前的。 你理解公众号就好理解了。
Direct 完全匹配的路由key,才会被接收
Topic 模糊匹配路由key,*一个单词,#任意数量字符
Head不管了,不怎么用。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
旧的博客,从这篇往后看 :http://www.ayjs.net/post/366.html
现在使用新的类库的新的写法去实现基本例子,官方是Core或者.NET Framework 4.5.1+
建立2个控制台,分别 引用 rabbitmq.client
使用稳定版5.1
生产者 MQ.Product1
测试 AyTestMQ
DEMO1 Hello Word 以前旧文章
发送方:
var factory = new ConnectionFactory() { HostName = "localhost",UserName="ay",Password="123456",Port=5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit.");
查看下,发送的消息只能byte[] 类型,如果是类,找个二进制序列化
有个hello队列了,这种处理 1个消息只能一个消费者
如果你失败了,你的兔子 队列的 所在的硬盘空间不够了,至少需要50M 的硬盘空间
接收方 最新版本换成事件方式了
需要引入
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
这2个空间了
在AyTestMQ项目里面 写法
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace AyTestMQ { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }
这里Received跟以前不一样了,以前的阻塞方式
测试:
启动生产者
启动消费者
由于设置
所以消费完没有删除,生产消息,也是没有自动删除
打开管理手动删除
=========================加深一些属性的印象====================
重新运行生产端,查看记录
生产时候,持久化false。
重启服务
刷新没有了。
把durable改为true,然后重新生产,
ready是1 总数是1,然后重启兔子服务,刷新web管理,ready是0了,虽然持久化,但是我感觉不能消费了。
进行消费测试
果然没有消息,但是消息下面此刻有个消费者了。
这个消息不是持久化了吗。。我也不太清楚了。 有人说要和exchange挂钩,好了。
特殊测试,一个消息只会被正确处理一次,即使web管理上存在,也是读取不了处理。
===================================
测试自动删除,手动把队列删了
生产者
channel.QueueDeclare(queue: “hello”,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
消费者
channel.QueueDeclare(queue: “hello”,
durable: true,
exclusive: false,
autoDelete: true,
arguments: null);
直接给我崩溃了
我再改
生产者
channel.QueueDeclare(queue: “hello”,
durable: true,
exclusive: false,
autoDelete: true,
arguments: null);
消费者
channel.QueueDeclare(queue: “hello”,
durable: true,
exclusive: false,
autoDelete: true,
arguments: null);
貌似两个改的一样了。哦,我明白了,两个要一致,感觉约定一样。
消费者可以拿到数据了。不关闭控制台,还没执行到using后面那段
打开web管理
queue队列还在,ready已经是0了,
关闭控制台,执行完,释放 连接
刷新web管理
queue队列不在了,已经被删除。
==================
总结,一个队列消费完了,不删除,也不能使用了,感觉就像一个记录存在那。
所以真实场景,删除好点。
上面例子 为了简洁起见,在很大程度上省略了诸如连接管理,错误处理,连接恢复,并发和度量收集之类的事。 这种简化的代码不应被视为生产就绪。
发送消息,生产者 接收消息 消费者 RabbitMQ是Erlang语言开发
实际场景Exchange用的多
1对多发布订阅(下篇讲,这篇让你更了解队列)
==============开始DEMO
2个控制台
发布者2
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ.Product2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: true, arguments: null); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); //channel.QueueDeclare(queue: "hello", // durable: true, // exclusive: false, // autoDelete: true, // arguments: null); //string message = "Hello World!"; //var body = Encoding.UTF8.GetBytes(message); //channel.BasicPublish(exchange: "", // routingKey: "hello", // basicProperties: null, // body: body); //Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } } }
这里基于上一个DEMO改的,这里我们设置了一个properties了。
运行项目。
然后消费者修改代码(基于DEMO1的消费者 代码)
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "task_queue", // durable: true, // exclusive: false, // autoDelete: true, // arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer); //var consumer = new EventingBasicConsumer(channel); //consumer.Received += (model, ea) => //{ // var body = ea.Body; // var message = Encoding.UTF8.GetString(body); // Console.WriteLine(" [x] Received {0}", message); //}; //channel.BasicConsume(queue: "hello", // autoAck: true, // consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }
主要接收消息,处理,模拟耗时工作。
发的消息一个 点号 停顿1秒
生产端消息改下
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(” “, args) : “Hello.World.AY.2019”);
}
消费端改改
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(” [x] Received {0}”, message);
var _3 = message.Split(‘.’);
//int dots = message.Split(‘.’).Length – 1;
foreach (var item in _3)
{
Console.WriteLine(item);
Thread.Sleep(1000);
}
运行生产端,然后消费端效果如下
测试2,
开启生产者,然后开启消费者,如上所示,不要关闭,关掉生产者在打开,消费者那段又收到消息了。
同样的,如果有2个消费者, rabbitmq会发给下一个消费者,这种分发消息叫做 round-robin(循环调度)
一个消息只给一个消费者处理。
场景:其实我们可以做 用户的请求,每个请求放入消息队列,然后让消息队列给空闲的 消费者去消费处理。1个消费者不够处理,可以运行多个来吃完任务。
任务会耗时间的。
您可能想知道如果其中一个消费者开始执行长任务并且仅在部分完成时死亡会发生什么。
上面的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。
在这种情况下,如果当前的消费者挂了,我们将丢失它刚刚处理的消息。
我还将丢失分发给这个消费者的 还未处理的所有消息。
但是我不想丢失任何的消息(1个消息一个任务),如果消费者处理挂了,我当然更想把消息给其他的消费者处理。
为了确保消息永不丢失,RabbitMQ 提供了一个 ack机制, 手动应答,处理完了,告诉兔子,我处理完了,等兔子空闲时候就删除该消息了。
定义 消费者死了,就是 channel关闭,connection关闭,tcp断开了,没网络了。
当消费者还没发送 ack,兔子那边就会认为 消息没有被处理,又会恢复回去了。如果同一时间,还有其他消费者在线,兔子会把这烫手山芋给其他的消费者。
恩,所以啊,你的程序没死,他的消息一直存在兔子那的,除非你手动应答。如果你挂了没应答,会看有没有其他的消费者处理。
接下来模拟这个场景
生产者生产个消息,然后修改消费者代码
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "task_queue", // durable: true, // exclusive: false, // autoDelete: true, // arguments: null); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); throw new NotImplementedException(); Console.WriteLine(" [x] Received {0}", message); var _3 = message.Split('.'); foreach (var item in _3) { Console.WriteLine(item); Thread.Sleep(2000); } Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); };
设置了autoack:false了
然后received里面设置了 ea.DeliveryTag
测试1
这里处理消息,停留了2秒1个字段,我们再BasicAck应答之前关闭程序,看消息会不会被删除了。
由于抛出异常Unacked 为1了。
把程序关了
消息还是删除了。。
我怀疑服务端设置了 自动删除导致的。我改为false测试,这样生产了1个不会自动删除的消息。
测试2
运行修改后的生产者
消费者代码不改,让抛出异常
然后关闭程序,过一会,消息恢复正常了。这次就对了。也就是生产者自动删除我觉得大部分都是关闭的。
测试3
正确处理,看消息会不会删除,移除抛弃异常的代码
ready终于是0了
然后关闭客户端,断开连接(执行完using,释放连接),队列被处理了,没删除哦
那如果想要删除呢,暂时先这样吧,因为用的最多的还是Exchange的Topic
注意:
忘记写 BasicAck这行代码, 这是一个简单的错误,但后果是严重的。 当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。
假如忘了unack
测试4
注释掉代码,然后生产个消息,然后运行消费者
再运行消费者,当然 连接不要释放,不然任务客户端死了,又恢复回去了
这里我们打开命令行
rabbitmqctl list_queues name messages_ready messages_unacknowledged
貌似超时了 这里就列出名字了。算了,遇到再看。
=============================================================
持久性,如果兔子挂了,消息还是会丢丢失了。
hannel.QueueDeclare(queue: “hello”,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
设置持久化,就会不丢失了。但是兔子不允许你重新定义一个已经存在的队列,然后更改属性
你可以换个名字重新定义一个。
对了,如果服务器重启,我们在上篇博客说到 消息恢复了,但是不可再被消费了,但是如果生产消息时候,加上下面代码就好了,终于解决了 durable=true也无效的问题了。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
将消息标记为持久性并不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接s收消息并且尚未保存消息时,仍然有一个短时间窗口。 此外,RabbitMQ不会为每条消息执行fsync(2) – 它可能只是保存到缓存而不是真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列来说已经足够了。 如果您需要更强的保证,那么您可以使用发布者确认(publisher confirms)。
公平调度 Fair Dispatch
2个消费者,一个很忙,一个几乎不做事,兔子不知道谁忙谁不忙的,还是均匀的发消息的。
发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。
它不会查看消费者未确认消息的数量。
它只是盲目地向第n个消费者发送每个第n个消息
为了改变这种行为,我们可以使用BasicQos方法,shezhi PrefetchCount=1
这会告诉兔子,不要同一时间给超过一个消息以上给一个消费者,因为它很忙,可能还没处理完,你又来了。
换句话说,在处理并确认前一个消息之前,不要向该工作程序发送新消息。 相反,它会将它发送给下一个不忙的 消费者。
channel.BasicQos(0, 1, false);
这里注意队列的 size
如果所有的 消费者都很忙,并且你的queue填满了。你就要考虑是否添加更多的消费者,或者换个思路去解决问题。
消费者修改后的代码如下:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); var _3 = message.Split('.'); foreach (var item in _3) { Console.WriteLine(item); } Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
关于IModel内的方法和IBasicProperties你想了解的,可以查看 RabbitMQ .NET client API reference online
特别推荐以下指南
particularly recommend the following guides: Publisher Confirms and Consumer Acknowledgements, Production Checklist and Monitoring.
Exchange 发布订阅 1个生产者对多个消费者
队列是一个存储消息的buffer
对的,会爬要走路了。
生产者代码如下
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ3 { class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello.World!"); } } }
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
运行,
命令行列出rabbitmqctl list_exchanges
amq.*开头的 默认邮箱,安装好RabbitMQ就有的,暂时用不到。
在以前的代码,我们没有用到exchange,但是仍然可以发消息到队列。那是因为我们发到默认邮箱去了,我们给exchange赋值是空白
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
现在我们代码 声明一个 fanout的 邮箱(交换器),取个名字叫logs
channel.ExchangeDeclare(exchange: “logs”, type: “fanout”);
临时队列
前面的代码,我们给队列取名字 hello,task_queue还记得吗?命名后,消费者通过名字拿到队列然后 处理消息的。
在这个例子对于我们的logger不重要,我们只关注log内容,
我们可以通过下面,随机取名字。然后一旦消费者断开连接,消息要能删除。
默认,创建的是非 持久化,exclusive,自动删除的队列
ar queueName = channel.QueueDeclare().QueueName;
exclusive queue独占队列,相当于给队列lock了,别人不能拿到。 一般消费者死了,独占队列会被删除。因此用于特定的某些场景。
你占用了,当别人尝试访问,就会报 RESOURCE_LOCKED的错误异常。表示无法获得对 锁定队列的 独占访问权限。
Binding
队列要放入邮箱才好,放进去叫绑定
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
上面代码我没给队列名字,因为我不关心,你也可以写个 产品约定好的名字 ,或者调用临时队列的知识,随机名字。
列出绑定(命令行)
rabbitmqctl list_bindings
发布消息
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
上面的exchange的邮箱一定要存在,不然发布失败。
如果队列没有绑定邮箱,消息将会丢失,但这对我们没有问题; 因为如果没有消费者在监听,我们可以安全地丢弃该消息。
消费者:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
消费者先打开,然后打开生产者
AY 过程理解:
测试1:
把邮箱都删掉,保持干净环境
我们运行消费者:此时通过web管理,看到多了个logs
对内多了一个amq.genXXXX的名字的queue,
关闭消费者==》》 临时队列删除了,exchange依旧存在。
然后运行生产者,不退出,队列没变化
打开消费者,也无法获得消息。
我们生产个消息,消费者不关闭,立即就可以收到消息了。
总结: 消费者连接了在线才可以收到消息。
(由于我们的代码,释放连接,就关闭消费者程序就行了,意味着 消费者死亡,那么临时队列也会被删除了。 满足条件)
接下来运行多个消费者,然后打开生产者,每个消费者都能正确拿到消息了。
运行多个,会产生多个临时队列,理解OK。
fanout 就是根据exchange名字来拿消息,没啥过滤在里面,但是你也可以 设计名字,来分配业务。
下面会将 可以过滤的topic,header
关于我以前写的fanout教程: 小坦克
Routing 路由 集中处理 数据 然后 按照 约定/规则 正确的 广播到 消费者
在本教程中,我们将为其添加一个功能 – 我们将只能订阅一部分消息。 例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
Direct Exchange 以前我写的博客
我们上一个教程中的日志记录系统向所有消费者广播所有消息。 我们希望扩展它以允许根据消息的严重性过滤消息。 例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。
我们使用的是fanout,它没有给我们太大的灵活性 – 它只能进行无意识的广播。
我们将使用direct。 direct背后的路由算法很简单 – 消息进入队列,binding的key和route key一致就行了。
using System; using System.Linq; using RabbitMQ.Client; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var severity = "info"; var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
direct类型的,然后写个 路由的规则,RouteKey 这里直接 给个名字。
消费者代码:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var queueName = channel.QueueDeclare().QueueName; var severity = "info"; channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
路由key 一致 就接收消息了。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
接下来Topic,直接在这个demo改
using System; using System.Linq; using RabbitMQ.Client; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var routingKey = "anonymous.info"; var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); } Console.ReadKey(); } }
消费者
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var queueName = channel.QueueDeclare().QueueName; var severity = "anonymous.*"; channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: severity); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
跟上面区别是 RouteKey 带了 * 号或者 #号
*号 代表1个单词
#号代表 0个以上的单词
服务端 消息路由规则 anonymous.info
换成 var severity = “a.*”;
肯定收不到消息
换成# 肯定可以
*.info也可以
a# 收不到消息的
*.* 可以收到,然后把生产换成anonymous 就收不到了,因为路由规则 1个单词.第二个单词
以上内容是 AY做过测试了。
========================================================================
讲一下RPC,稍微有点绕,你理解一个既是消费端也是生产者,双方都是的,也可以配合 http请求响应稍微配合理解。
但是也是有点不一样。 我还是把上面代码注释了,还在那2个 控制台改。
场景:
如果我们需要在远程计算机上运行一个函数并等待结果呢? 嗯,这是一个不同的故事。 此模式通常称为Remote Procedure Call 或者 RPC.
在本教程中,我们将使用RabbitMQ构建RPC系统:客户端和可伸缩的RPC服务器。 由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci 斐波那契 数字的虚拟RPC服务
为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为Call的方法,该方法发送一个RPC请求并阻塞,直到收到答案为止。
请求服务器(生产者,返回一个斐波那契数字)
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
代码比较好理解的,fib是一个 返回斐波那契数字的,这里不考虑数字是否是正整数了。
自己创建一个接收请求的队列,名字叫rpc_queue,手动应答,处理完成,再应答完成。
然后收到一个消息后,处理,中间有个约定的CorrelationId 写上去,
然后 在往这个RouteKey写上 返回值的一些信息。
消费者:
using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); Console.ReadLine(); } }
以上的调用没有进行类型判断响应,比如输入的是否合法,如果服务器没有运行等,这里是最简单的调用示例。
客户端是否应该有超时设计等。
比如服务器处理发生异常,是否应该转发回客户端,说没处理好。
此处介绍的设计并不是RPC服务的唯一可能实现,但它具有一些重要优势:
如果RPC服务器太慢,您可以通过运行另一个服务器来扩展。 尝试在新控制台中运行第二个RPCServer。
在客户端,RPC只需要发送和接收一条消息。 不需要像QueueDeclare这样的同步调用。 因此,对于单个RPC请求,RPC客户端只需要一次网络往返。
测试:
先运行 服务端,服务端等待 消费者的请求,然后接收到,处理返回给 消费者。
到此,基本的类库调用 AY讲解完了。
消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。
这种场景: 延迟任务
知识:消息的TTL和死信Exchange
RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。
rabbitmq-delayed-message-exchange这个插件也可以实现,看C#实现。
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。
超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。
所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。
这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
Dead Letter Exchanges
定义消息死亡
1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
2. 上面的消息的TTL到了,消息过期了。
3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
就是个普通的Exchange,存放死掉的消息
两个控制台:
P端(以后生产者只说P了,消费者是 C端):
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ.Product2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) { while (Console.ReadLine() != null) { using (var channel = connection.CreateModel()) { Dictionary<string, object> dic = new Dictionary<string, object>(); dic.Add("x-expires", 10000); dic.Add("x-message-ttl", 8000);//队列上消息过期时间,应小于队列过期时间 dic.Add("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由 dic.Add("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey //创建一个名叫"deadQueue"的消息队列 channel.QueueDeclare(queue: "deadQueue", durable: true, exclusive: false, autoDelete: false, arguments: dic); var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); //向该消息队列发送消息message channel.BasicPublish(exchange: "", routingKey: "deadQueue", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } } } Console.ReadKey(); } } }
运行项目,按任意键,创建1个队列,刷新web管理
然后过了8秒,队列就删除了。
这里队列有个TTL Exp DLX DLK属性,正好上面3个属性的设置
我们也可以在管理端看到这些消息参数:
单击的时候,就可以设置了,
在网上找了一些资料,下面代码可能java的设置,.net简单如上键值就行了
Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));
Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp
Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim
Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,
Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
Master locator(x-queue-master-locator) 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。
(设置“x-queue-master-locator”参数。)
整理下: 1个 Exchange 可以多个 Queue 1个Queue可以多个Message Message可以设置优先级,Queue可以设置最多多少个Message,多大字节的存储,也可以设置优先级。
上次持久化的问题理解:
设置消息持久化必须先设置队列持久化,要不然队列不持久化,消息持久化,队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的。
上面代码,我们看到了,消息过期了,就会被扔到一个叫exchange-direct的 邮箱,真好,那么等他过期了,那么消费者 监听这个邮箱,是不是8秒过后就收到了消息,达到了 延迟的效果。跟RPC的设计异曲同工之妙。如果没有指定邮箱,8秒后消息就会被删除了。 10秒后 队列就会被删除。
接下来,
消费者,就是创建个 被转发的 exchange的监听,
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { Console.Title = "AY 2019 2018-12-5"; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct"); string name = channel.QueueDeclare().QueueName; //string name = "deadQueue"; channel.QueueBind(queue: name, exchange: "exchange-direct", routingKey: "routing-delay"); //回调,当consumer收到消息后会执行该函数 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(ea.RoutingKey); Console.WriteLine(" [x] Received {0}", message); }; //Console.WriteLine("name:" + name); //消费队列"hello"中的消息 channel.BasicConsume(queue: name, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } Console.ReadKey(); } } }
打开生产端,按下回车,进入循环,然后打开消费端,8秒后消费者就收到了消息。
如果8秒后,消息都被转发了,消费者打开,是收不到消息的。
心跳检测,客户端服务端是否断开了。
默认情况下,在3.5.5版本之前,rabbitmq设置的默认与客户端心跳时间为580秒,之后为60秒(如果时间间隔配置为0,则表示不启用heartbeat检测),两者时间会每隔timeout / 2 进行一次心跳互通。
=ERROR REPORT==== 01-Dec-2018::12:38:00 ===
closing AMQP connection <0.909.1> (125.120.18.131:5060 -> 120.27.140.42:5672):
Missed heartbeats from client, timeout: 10s
启用心跳检测后,rabbitmq会为每个tcp连接创建两个进程用于心跳检测(这可以通过rabbitmq.log看到每个客户端确实有两个连接,关闭的时候也是成对的方式),一个进程定时检测tcp连接上是否有数据发送(这里的发送是指rabbitmq发送数据给客户端),如果一段时间内没有数据发送给客户端,则发送一个心跳包给客户端,然后循环进行下一次检测;另一个进程定时检测tcp连接上是否有数据的接收,如果一段时间内没有收到任何数据,则判定为心跳超时,最终会关闭tcp连接。另外,rabbitmq的流量控制机制可能会暂停heartbeat检测。
如果超过2次心跳无响应,则会认为对方不可到达并关闭连接。此时,客户端通常需要重新连接。具体视客户端的不同而不同。
具体可见,小坦克。
在我们的环境中,应该来说负载并不是特别的高,ping 1k的延时基本上都在20ms以内,照理设置10ms的心跳间隔足以,为什么还是会出现,还需要trace_on看下详细的网络包情况。
将心跳超时值设置得太低可能会导致误报(由于瞬时网络拥塞,短暂的服务器流控制等原因,对等体被认为是不可用的,而实际情况并非如此)。 选择超时值时应考虑这一点。
用户和客户端库维护人员几年的反馈意见表明,低于5秒的值很可能导致误报,1秒或更低的值很可能会这样做。 对于大多数环境,5到20秒范围内的值是最佳的。
某些网络工具(HAproxy,AWS ELB)和设备(硬件负载平衡器)可能会在一段时间内没有活动时终止“空闲”TCP连接。 大多数时候这是不可取的。
在连接上启用心跳时,会导致周期性的轻型网络流量。 因此,心跳具有保护客户端连接的副作用,这些客户端连接可以在一段时间内空闲,以防止代理和负载平衡器过早关闭。
心跳超时为30秒时,连接将大约每15秒产生一次定期网络流量。 5到15秒范围内的活动足以满足大多数常用代理和负载平衡器的默认值。 另请参阅上面关于低超时和误报的部分。
由于错过了心跳,RabbitMQ节点将记录连接已关闭。 所有官方支持的客户端库也将如此。 检查服务器和客户端日志将提供有价值的信息,应该是第一个故障排除步骤。
可能需要检查打开或来自节点的连接,其状态,来源,用户名和有效心跳超时值。 “网络故障排除指南”概述了可用于提供帮助的工具。
www.ayjs.net 六安杨洋(AY)
===================================================================
消费者拒收消息写法 channel.BasicReject(ea.DeliveryTag, false);
BasicReject方法第一个参数是消息的DeliveryTag,对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回queue中,requeue。
BasicReject一次只能拒绝接收一个消息,而BasicNack方法可以支持一次0个或多个消息的拒收,并且也可以设置是否requeue。
channel.BasicNack(3, true, false);
QoS = quality-of-service, 顾名思义,服务的质量。通常我们设计系统的时候不能完全排除故障或保证说没有故障,而应该设计有完善的异常处理机制。在出现错误的时候知道在哪里出现什么样子的错误,原因是什么,怎么去恢复或者处理才是真正应该去做的。在接收消息出现故障的时候我们可以通过RabbitMQ重发机制来处理。重发就有重发次数的限制,有些时候你不可能不限次数的重发,这取决于消息的大小,重要程度和处理方式。
甚至QoS是在接收端设置的。发送端没有任何变化,接收端的代码也比较简单,只需要加如下代码:
代码第一个参数是可接收消息的大小的,但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况。第二个参数是处理消息最大的数量。举个例子,如果输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞。如果输入3,那么可以最多有3个消息不应答,如果到达了3个,则发送端发给这个接收方得消息只会在队列中,而接收方不会有接收到消息的事件产生。总结说,就是在下一次发送应答消息前,客户端可以收到的消息最大数量。第三个参数则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的。
这种数量的设置,也为我们在多个客户端监控同一个queue的这种负载均衡环境下提供了更多的选择。
其他一些设置,了解下
后面有些基础不看了,直接要封装库了。
网上一些的封装RabbitMQ 的使用
简洁,参考博客1
运行单元测试的几个总结
1 创建了一个队列,然后设置属性,由于不是过期,自动删除的,所以记录存在,然后我又单元测试,我修改了过期时间,但是还是那个队列名,所以属性对不上,所以报错了, 要不,删除那个队列,要不你把属性还原回去,要不然换个名字。
2 错误2, 今天我的web管理打不开了,因为我的 win10 把我防火墙打开了,我默认都关掉的,当然真实环境要开防火墙的,配置入站出站的端口号就好了。
以上内容是我花了一周编写,主要和.NET一起使用,AY自己编写,开心就转载吧,我没意见。