先决条件
本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表联系我们

(使用.NET客户端)

教程[2]中,我们创建了一个工作队列,假设在工作队列中的每一个任务都只被分发给一个Worker。那么在这一章节,我们要做与之完全不同的事,那就是我们将要把一条消息分发给多个消费者。这种模式被称为“发布/订阅”。

为了说明、体现这种模式,我们将会建一个简单的日志系统。它将会包含两个程序 – 第一个用来发送日志消息,第二个用来接收并打印它们。

在我们建立的日志系统中,每个接收程序的运行副本都会收到消息。这样我们就可以运行一个接收程序接收消息并将日志写入磁盘;同时运行另外一个接收程序接收消息并将日志打印到屏幕上。

实质上,发布的日志消息将会被广播给所有的接收者。

在教程的前几部分,我们是发送消息到队列并从队列中接收消息。现在是时候介绍Rabbit中完整的消息传递模型了。

让我们快速回顾一下前面教程中的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

在RabbitMQ中,消息传递模型的核心理念是生产者从来不会把任何消息直接发送到队列,其实,通常生产者甚至不知道消息是否会被分发到任何队列中。

然而,生产者只能把消息发送给交换器。交换器非常简单,一方面它接收来自生产者的消息,另一方面又会把接收的消息推送到队列中。交换器必须明确知道该如何处理收到的消息,应该追加到一个特定队列中?还是应该追加到多个队列中?或者应该把它丢弃?这些规则都被定义在交换器类型中。

Exchanges

目前有这几种的交换器类型可用:directtopicheadersfanout。我们重点关注最后一个 — fanout,让我们来创建一个这种类型的交换器,将其命名为logs

  1. channel.ExchangeDeclare("logs", "fanout");

fanout类型交换器非常简单。正如您可能从名字中猜出的那样,它会把收到的所有消息广播到它已知的所有队列中。这恰巧是我们的日志系统所需要的。

列举交换器
要列举出服务器上的交换器,您可以使用非常有用的rabbitmqctl命令行工具:

  1. sudo rabbitmqctl list_exchanges

执行上述命令后,出现的列表中将会有一些amq.*交换器和默认(未命名)交换器。这些是默认创建的,不过目前您可能用不到它们。

默认交换器
在教程的前些部分,我们对交换器这一概念还一无所知,但仍然可以把消息发送到队列。之所以这样,是因为我们使用了一个用空字符串("")标识的默认交换器。

回顾一下我们之前如何发布消息:

  1. var message = GetMessage(args);
  2. var body = Encoding.UTF8.GetBytes(message);
  3. channel.BasicPublish(exchange: "",
  4. routingKey: "hello",
  5. basicProperties: null,
  6. body: body);

第一个参数就是交换器的名称,空字符串表示默认或匿名交换器:将消息路由到routingKey指定的队列(如果存在)中。

现在,我们可以把消息发布到我们指定的交换器:

  1. var message = GetMessage(args);
  2. var body = Encoding.UTF8.GetBytes(message);
  3. channel.BasicPublish(exchange: "logs",
  4. routingKey: "",
  5. basicProperties: null,
  6. body: body);

您是否还记得之前我们使用过的队列,它们都有一个特定的名称(记得应该是hellotask_queue吧)。给队列命名对我们来说是至关重要的 — 因为我们可能需要多个Worker指向同一个队列;当您想要在生产者和消费者之间共享队列时,给队列一个名称也是非常重要的。

但是,我们创建的日志系统并不希望如此。我们希望监听所有的日志消息,而不仅仅是其中一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。为解决这个问题,我们需要做好两件事。

首先,我们无论何时连接Rabbit,都需要一个新的、空的队列。要做到这一点,我们可以使用随机名称来创建队列,或许,甚至更好的方案是让服务器为我们选择一个随机队列名称。

其次,一旦我们与消费者断开连接,与之相关的队列应该被自动删除。

在.NET客户端中,如果不向QueueDeclare()方法提供任何参数,实际上就是创建了一个非持久化、独占、且自动删除的随机命名队列:

  1. var queueName = channel.QueueDeclare().QueueName;

您可以在队列指南中了解更多关于exclusive参数和其他队列属性的信息。

此时,queueName包含一个随机队列名称。例如,它看起来可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg

Bindings

我们已经创建好了一个fanout交换器和一个队列。现在我们需要告诉交换器把消息发送到我们的队列。而交换器和队列之间的关系就称之为绑定

  1. // 把一个队列绑定到指定交换器。
  2. channel.QueueBind(queue: queueName,
  3. exchange: "logs",
  4. routingKey: "");

从现在起,logs交换器会把消息追加到我们的队列中。

列举绑定
您可以使用(您或许已经猜到了),列举出现有的绑定。

  1. sudo rabbitmqctl list_bindings

Putting it all together

生产者程序负责分发消息,这与之前的教程看起来没有太大区别。

最重要的变化是我们现在想把消息发布到我们的logs交换器,而不是匿名交换器。在发送时我们需要提供一个路由键routingKey,但是对于fanout交换器,它的值可以被忽略。这里是EmitLog.cs文件的代码:

  1. using System;
  2. using RabbitMQ.Client;
  3. using System.Text;
  4. class EmitLog
  5. {
  6. public static void Main(string[] args)
  7. {
  8. var factory = new ConnectionFactory() { HostName = "localhost" };
  9. using(var connection = factory.CreateConnection())
  10. using(var channel = connection.CreateModel())
  11. {
  12. channel.ExchangeDeclare(exchange: "logs", type: "fanout");
  13. var message = GetMessage(args);
  14. var body = Encoding.UTF8.GetBytes(message);
  15. channel.BasicPublish(exchange: "logs",
  16. routingKey: "",
  17. basicProperties: null,
  18. body: body);
  19. Console.WriteLine(" [x] Sent {0}", message);
  20. }
  21. Console.WriteLine(" Press [enter] to exit.");
  22. Console.ReadLine();
  23. }
  24. private static string GetMessage(string[] args)
  25. {
  26. return ((args.Length > 0)
  27. ? string.Join(" ", args)
  28. : "info: Hello World!");
  29. }
  30. }

EmitLog.cs源码)

如你所见,在建立连接后,我们声明了交换器。这一步非常有必要,因为发布消息到一个不存在的交换器,这种情况是被禁止的。

如果没有队列绑定到交换器上,消息将会丢失,但这对我们来说并没有什么没问题;如果没有消费者正在监听,我们是可以放心地把消息丢弃的。

ReceiveLogs.cs的代码:

  1. using System;
  2. using RabbitMQ.Client;
  3. using RabbitMQ.Client.Events;
  4. using System.Text;
  5. class ReceiveLogs
  6. {
  7. public static void Main()
  8. {
  9. var factory = new ConnectionFactory() { HostName = "localhost" };
  10. using(var connection = factory.CreateConnection())
  11. using(var channel = connection.CreateModel())
  12. {
  13. channel.ExchangeDeclare(exchange: "logs", type: "fanout");
  14. var queueName = channel.QueueDeclare().QueueName;
  15. channel.QueueBind(queue: queueName,
  16. exchange: "logs",
  17. routingKey: "");
  18. Console.WriteLine(" [*] Waiting for logs.");
  19. var consumer = new EventingBasicConsumer(channel);
  20. consumer.Received += (model, ea) =>
  21. {
  22. var body = ea.Body;
  23. var message = Encoding.UTF8.GetString(body);
  24. Console.WriteLine(" [x] {0}", message);
  25. };
  26. channel.BasicConsume(queue: queueName,
  27. autoAck: true,
  28. consumer: consumer);
  29. Console.WriteLine(" Press [enter] to exit.");
  30. Console.ReadLine();
  31. }
  32. }
  33. }

ReceiveLogs.cs源码)

按照教程[1]中的设置说明生成EmitLogsReceiveLogs项目。

如果您想把日志保存到文件中,只需打开一个控制台并输入:

  1. cd ReceiveLogs
  2. dotnet run > logs_from_rabbit.log

如果你想在屏幕上看到日志,我可以新开一个终端并运行:

  1. cd ReceiveLogs
  2. dotnet run

当然,分发日志需要输入:

  1. cd EmitLog
  2. dotnet run

使用rabbitmqctl list_bindings命令,您可以验证代码是否真正创建了我们想要的绑定和队列。当有两个ReceiveLogs.cs程序运行时,您应该看到如下所示的内容:

  1. sudo rabbitmqctl list_bindings
  2. # => Listing bindings ...
  3. # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
  4. # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
  5. # => ...done.

对执行结果的解释简洁明了:来自logs交换器的数据转发到了两个由服务器随机分配名称的队列。这正是我们期待的结果。

想要了解如何监听消息的这一块内容,让我们继续阅读教程[4]

本文翻译自RabbitMQ官方教程C#版本。本文介绍如与官方有所出入,请以官方最新内容为准。

水平有限,翻译的不好请见谅,如有翻译错误还请指正。

版权声明:本文为esofar原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/esofar/p/rabbitmq-publish-subscribe.html