在上一篇博客《RabbitMQ入门:发布/订阅(Publish/Subscribe)》中,我们认识了fanout类型的exchange,它是一种通过广播方式发送消息的路由器,所有和exchange建立的绑定关系的队列都会接收到消息。但是有一些场景只需要订阅到一部分消息,这个时候就不能使用fanout 类型的exchange了,这个就引出来今天的“猪脚”–Direct Exchange,通过Routing Key来决定需要将消息发送到哪个或者哪些队列中。

接下来请收看详细内容:

  1. Direct Exchange(直接路由器)
  2. 多重绑定
  3. 代码实例

一、Direct Exchange(直接路由器)

在上文中介绍exchange的时候,对direct exchange进行了简单介绍,它是一种完全按照routing key(路由关键字)进行投递的:当消息中的routing key和队列中的binding key完全匹配时,才进行会将消息投递到该队列中。这里提到了一个routing key和binding key(绑定关键字),是什么东东?

  1. routing key:

     在发送消息的时候,basicPublish的第二个参数就是routing key,由于上次是fanout 类型的exchange 进行广播方式投递,这个字段不会影响投递结果,因此我们这里就传入了“”,但是在direct 类型的exchange中我们就不能传入””了,需要指定具体的关键字。

  2. binding key:

    我们在前文中建立绑定关系的时候,queueBind的第三个参数就是绑定关键字

我们声明direact exchange的时候使用:

二、多重绑定

多个队列相同的绑定键绑定到同一个路由器的情况,我们称之为多重绑定

工作模型为(P代表生产者,X代表路由器,红色的Q代表队列,C代表消费者):

 

三、代码实例

 预备知识了解完了,现在来写个程序感受下。

  1. 生产者
    1. public class LogDirectSender {
    2. // exchange名字
    3. public static String EXCHANGE_NAME = "directExchange";
    4. public static void main(String[] args) {
    5. ConnectionFactory factory = new ConnectionFactory();
    6. factory.setHost("localhost");
    7. Connection connection = null;
    8. Channel channel = null;
    9. try {
    10. // 1.创建连接和通道
    11. connection = factory.newConnection();
    12. channel = connection.createChannel();
    13. // 2.为通道声明direct类型的exchange
    14. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    15. // 3.发送消息到指定的exchange,队列指定为空,由exchange根据情况判断需要发送到哪些队列
    16. String routingKey = "debug";
    17. String msg = " hello rabbitmq, I am " + routingKey;
    18. channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
    19. System.out.println("product send a msg: " + msg);
    20. } catch (IOException e) {
    21. e.printStackTrace();
    22. } catch (TimeoutException e) {
    23. e.printStackTrace();
    24. } finally {
    25. // 4.关闭连接
    26. if (channel != null) {
    27. try {
    28. channel.close();
    29. } catch (IOException e) {
    30. e.printStackTrace();
    31. } catch (TimeoutException e) {
    32. e.printStackTrace();
    33. }
    34. }
    35. if (connection != null) {
    36. try {
    37. connection.close();
    38. } catch (IOException e) {
    39. e.printStackTrace();
    40. }
    41. }
    42. }
    43. }
    44. }

    和上次博客中生产者的区别就是黑字粗体部分:1.路由器类型改为direct 2.消息发布的时候指定了routing key

  2. 消费者
    1. public class LogDirectReciver {
    2. public static void main(String[] args) {
    3. ConnectionFactory factory = new ConnectionFactory();
    4. factory.setHost("localhost");
    5. Connection connection = null;
    6. Channel channel = null;
    7. try {
    8. // 1.创建连接和通道
    9. connection = factory.newConnection();
    10. channel = connection.createChannel();
    11. // 2.为通道声明direct类型的exchange
    12. channel.exchangeDeclare(LogDirectSender.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    13. // 3.创建随机名字的队列
    14. String queueName = channel.queueDeclare().getQueue();
    15. // 4.建立exchange和队列的绑定关系
    16. String[] bindingKeys = { "error", "info", "debug" };
    17. // String[] bindingKeys = { "error" };
    18. for (int i = 0; i < bindingKeys.length; i++) {
    19. channel.queueBind(queueName, LogDirectSender.EXCHANGE_NAME, bindingKeys[i]);
    20. System.out.println(" **** LogDirectReciver keep alive ,waiting for " + bindingKeys[i]);
    21. }
    22. // 5.通过回调生成消费者并进行监听
    23. Consumer consumer = new DefaultConsumer(channel) {
    24. @Override
    25. public void handleDelivery(String consumerTag, Envelope envelope,
    26. com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    27. // 获取消息内容然后处理
    28. String msg = new String(body, "UTF-8");
    29. System.out.println("*********** LogDirectReciver" + " get message :[" + msg + "]");
    30. }
    31. };
    32. // 6.消费消息
    33. channel.basicConsume(queueName, true, consumer);
    34. } catch (IOException e) {
    35. e.printStackTrace();
    36. } catch (TimeoutException e) {
    37. e.printStackTrace();
    38. }
    39. }
    40. }

    和上次博客中消费者的区别就是黑字粗体部分:1.路由器类型改为direct 2.建立绑定关系的时候指定了binding key

  3. 执行消费者,控制台log打印如下:
    1. **** LogDirectReciver keep alive ,waiting for error
    2. **** LogDirectReciver keep alive ,waiting for info
    3. **** LogDirectReciver keep alive ,waiting for debug

    这个消费者我们视为消费者1,它会接收error,info,debug三个关键字的消息。

  4. 将String[] bindingKeys = { “error”, “info”, “debug” };改为String[] bindingKeys = { “error” };,然后再运行一次消费者。控制台log打印如下:
    1. **** LogDirectReciver keep alive ,waiting for error

    这个消费者我们视为消费者2,它只会接收error 关键字的消息。

  5. 执行生产者,然后将String routingKey = “debug”;的值分别改为“info”和”error”,然后分别执行,这样一共执行了三次生产者
    1. 第一次执行:
    2. product send a msg: hello rabbitmq, I am debug
    3. 第二次执行:
    4. product send a msg: hello rabbitmq, I am info
    5. 第三次执行:
    6. product send a msg: hello rabbitmq, I am error
  6. 再次查看两个消费者的控制台log:
    1. 消费者1
    2. **** LogDirectReciver keep alive ,waiting for error
    3. **** LogDirectReciver keep alive ,waiting for info
    4. **** LogDirectReciver keep alive ,waiting for debug
    5. *********** LogDirectReciver get message :[ hello rabbitmq, I am debug]
    6. *********** LogDirectReciver get message :[ hello rabbitmq, I am info]
    7. *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
    8. 消费者2
    9. **** LogDirectReciver keep alive ,waiting for error
    10. *********** LogDirectReciver get message :[ hello rabbitmq, I am error]

     

  7. 查看RabbitMQ管理页面

    exchanges标签页里面多了个direct类型的路由器。进入详细页面:

    有4个绑定关系,其中三个的队列是同一个。切换到Queues标签页:

    有4个临时队列。

  8. 如果关掉消费者1和消费者2,会发现队列自动删除了,绑定关系也不存在了。

版权声明:本文为sam-uncle原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/sam-uncle/p/9209666.html