RabbitMQ是一个消息中间件,在一些需要异步处理、发布/订阅等场景的时候,使用RabbitMQ可以完成我们的需求。 下面是我在学习java语言实现RabbitMQ(自RabbitMQ官网的Tutorials)的一些记录。

首先有三个名称了解一下(以下图片来自rabbitMQ官网)

  • producer是用户应用负责发送消息

  • queue是存储消息的缓冲(buffer)

  • consumer是用户应用负责接收消息

下面是我使用rabbitMQ原生的jar包做的测试方法

maven pom.xml 加入

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco; color: #4e9192 }
p.p2 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
span.s1 { color: #009193 }
span.s2 { color: #4e9192 }
span.s3 { color: #000000 }
span.s4 { text-decoration: underline; color: #000000 }
span.Apple-tab-span { white-space: pre }

<dependency>

    <groupId>com.rabbitmq</groupId>

    <artifactId>amqp-client</artifactId>

    <version>3.5.6</version>

</dependency>

方法实现示意图

 

发送消息方法(Send.java)

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 
 5 public class Send {
 6      
 7     private static final String QUEUE_NAME = "hello";
 8  
 9     public static void main(String[] args) throws Exception {
10         ConnectionFactory factory = new ConnectionFactory();
11         factory.setHost("192.168.1.7");
12         factory.setPort(5672);
13         factory.setUsername("admin");
14         factory.setPassword("admin");
15         Connection connection = factory.newConnection();
16         Channel channel = connection.createChannel();
17  
18         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
19         String message = "Hello World!";
20         // "" 表示默认exchange
21         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
22         System.out.println(" [x] Sent '" + message + "'");
23  
24         channel.close();
25         connection.close();
26     }
27  
28 }

10~16行 是获取rabbitmq.client.Channel, rabbitMQ的API操作基本都是通过channel来完成的。

18行 channel.queueDeclare(QUEUE_NAME, false, false, false, null),这里channel声明了一个名字叫“hello”的queue,声明queue的操作是幂等的,也就是说只有不存在相同名称的queue的情况下才会创建一个新的queue。

21行 channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes()),chaneel在这个queue里发布了消息(字节数组)。

24~25行 则是链接的关闭,注意关闭顺序就好了。

接受消息方法 (Recv.java)

 1 import com.rabbitmq.client.AMQP;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 import com.rabbitmq.client.Consumer;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 
 9 import java.io.IOException;
10 
11 public class Recv {
12 
13   private final static String QUEUE_NAME = "hello";
14 
15   public static void main(String[] argv) throws Exception {
16     ConnectionFactory factory = new ConnectionFactory();
17     factory.setHost("192.168.1.7");
18     factory.setPort(5672);
19     factory.setUsername("admin");
20     factory.setPassword("admin");
21     Connection connection = factory.newConnection();
22     Channel channel = connection.createChannel();
23 
24     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
26 
27     Consumer consumer = new DefaultConsumer(channel) {
28       @Override
29       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
30           throws IOException {
31         String message = new String(body, "UTF-8");
32         System.out.println(" [x] Received '" + message + "'");
33       }
34     };
35     channel.basicConsume(QUEUE_NAME, true, consumer);
36   }
37 }

16~22行 和Send类中一样,也是获取同一个rabbitMQ服务的channel,这也是能接受到消息的基础。

24行 同样声明了一个和Send类中发布的queue相同的queue。

27~35行 DefaultConsumer类实现了Consumer接口,由于推送消息是异步的,因此在这里提供了一个callback来缓冲接受到的消息。

先运行Recv 然后再运行Send,就可以看到消息被接受输出到控制台了,如果多启动几个Recv,会发现消息被每个消费者按顺序分别消费了,

这也就是rabbitMQ默认采用Round-robin dispatching(轮询分发机制)。

 

Work queues

上面简单的实现了rabbitMQ消息的发送和接受,但是无论Send类中的queueDeclare 、basicPublish方法还有Recv类中的basicConsume方法都有很多的参数,

下面我们分析一下几个重要的参数。

(一)Message acknowledgment 消息答复

上面Recv.java的第35行中,channel.basicConsume(QUEUE_NAME, true, consumer),

在Channel接口中定义为 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

这个autoAck我们当前实现为true,表示服务器会自动确认ack,一旦RabbitMQ将一个消息传递到consumer,它马上会被标记为删除状态。

这样如果consumer在正常执行任务过程中,一旦consumer服务挂了,那么我们就永远的失去了这个consumer正在处理的所有消息。

为了防止这种情况,rabbitMQ支持Message acknowledgment,当消息被一个consumer接受并处理完成后,consumer发送给rabbitMQ一个回执,然后rabbitMQ才会删除这个消息。

当一个消息挂了,rabbitMQ会给另外可用的consumer继续发送上个consumer因为挂了而没有处理成功的消息。

因此我们可以设置autoAck=false,来显示的让服务端做消息成功执行的确认。

(二)Message durability 消息持久化

Message acknowledgment 确保了consumer挂了的情况下,消息还可以被其他consumer接受处理,但是如果rabbitMQ挂了呢?

在声明队列的方法中,Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

durable=true 意味着该队列将在服务器重启后继续存在。Send和Recv两个类中声明队列的方法都要设置durable=true。

现在,我们需要将消息标记为持久性——通过将MessageProperties(它实现BasicProperties)设置为PERSISTENT_TEXT_PLAIN

(三)Fair dispatch 公平分发

rabbitMQ默认是轮询分发,这样对多个consumer而言,可能就会出现负载不均衡的问题,无论是任务本身难易度,还是consumer处理能力的不同,都是导致这种问题。

为了处理这种情况我们可以使用basicQos方法来设置prefetchCount = 1。 这告诉rabbitMQ一次只给consumer一条消息,换句话来说,就是直到consumer发回ack,然后再向这个consumer发送下一条消息。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

正是因为Fair dispatch是基于ack的,所有它最好和Message acknowledgment同时使用,否则在autoAck=true的情况下,单独设置Fair dispatch并没有效果。

下面是本人测试以上三种情况的测试代码,可以直接使用。

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
     
    private static final String QUEUE_NAME = "task_queue";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.7");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        boolean durable = true;    //消息持久化
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 多个消息使用空格分隔 Scanner sc
= new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

  private final static String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 
    // basicQos方法来设置prefetchCount = 1。 这告诉RabbitMQy一次只给worker一条消息,换句话来说,就是直到worker发回ack,然后再向这个worker发送下一条消息。
    channel.basicQos(1);
 
    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
 
        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    // 当consumer确认收到某个消息,并且已经处理完成,RabbitMQ可以删除它时,consumer会向RabbitMQ发送一个ack(nowledgement)。
    boolean autoAck = true;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

    protected static void doWork(String message) throws InterruptedException {
        for (char ch: message.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

 

发布/订阅(Publish/Subscribe)

一个完整的rabbitMQ消息模型是会有Exchange的。

rabbitMQ的消息模型的核心思想是producer永远不会直接发送任何消息到queue中,实际上,在很多情况下producer根本不知道一条消息是否被发送到了哪个queue中。

在rabbitMQ中,producer仅仅将消息发送到一个exchange中。要理解exchange也非常简单,它一边负责接收producer发送的消息, 另一边将消息推送到queue中。

exchange必须清楚的知道在收到消息之后该如何进行下一步的处理,比如是否应该将这条消息发送到某个queue中? 还是应该发送到多个queue中?还是应该直接丢弃这条消息等。

exchange模型如下:

exchange类型也有好几种:directtopicheaders以及fanout。

Fanout exchange

下面我们来创建一个fanout类型的exchange,顾名思义,fanout会向所有的queue广播所有收到的消息。

 1 import java.io.IOException;
 2 import java.util.Scanner;
 3 import java.util.concurrent.TimeoutException;
 4 
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 
 9 import rabbitMQ.RabbitMQTestUtil;
10 
11 public class EmitLog {
12      
13     private static final String EXCHANGE_NAME = "logs";
14  
15     public static void main(String[] argv) throws IOException, TimeoutException {
16  
17         ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
18         Connection connection = factory.newConnection();
19         Channel channel = connection.createChannel();
20         
21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
22  
23         // 多个消息使用空格分隔
24         Scanner sc = new Scanner(System.in);
25         String[] splits = sc.nextLine().split(" ");
26         for (int i = 0; i < splits.length; i++) {
27                  channel.basicPublish(EXCHANGE_NAME, "", null, splits[i].getBytes());
28              System.out.println(" [x] Sent '" + splits[i] + "'");
29         }
30  
31         channel.close();
32         connection.close();
33     }
34 }

 

 1 import java.io.IOException;
 2 
 3 import com.rabbitmq.client.AMQP;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 import com.rabbitmq.client.Consumer;
 8 import com.rabbitmq.client.DefaultConsumer;
 9 import com.rabbitmq.client.Envelope;
10 
11 import rabbitMQ.RabbitMQTestUtil;
12 
13 public class ReceiveLogs {
14 
15     private static final String EXCHANGE_NAME = "logs";
16      
17       public static void main(String[] argv) throws Exception {
18         ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
19           Connection connection = factory.newConnection();
20           Channel channel = connection.createChannel();
21      
22         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
23         String queueName = channel.queueDeclare().getQueue();
24         channel.queueBind(queueName, EXCHANGE_NAME, "");
25      
26         Consumer consumer = new DefaultConsumer(channel) {
27           @Override
28           public void handleDelivery(String consumerTag, Envelope envelope,
29                                      AMQP.BasicProperties properties, byte[] body) throws IOException {
30             String message = new String(body, "UTF-8");
31             System.out.println(" [x] Received '" + message + "'");
32           }
33         };
34         channel.basicConsume(queueName, true, consumer);
35       }
36 }

Direct exchange

在fanout的exchange类型中,消息的发布已经队列的绑定方法中,routingKey参数都是默认空值,因为fanout类型会直接忽略这个值,

但是在其他exchange类型中它拥有很重要的意义,

      

rabbitMQ支持以上两种绑定,消息在发布的时候,会指定一个routing key,而图一中exchange会把routing key为orange发送的消息将会被路由到queue Q1中,使用routing key为black或者green的将会被路由到Q2中。

将多个queue使用相同的binding key进行绑定也是可行的。可以在X和Q1中间增加一个routing key black。 它会向所有匹配的queue进行广播,使用routing key为black发送的消息将会同时被Q1Q2接收。

 下面是我测试debug和error两种routing key发布消息并接受处理消息的代码:

import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import rabbitMQ.RabbitMQTestUtil;

public class EmitLog {
     
    private static final String EXCHANGE_NAME = "direct_logs";
 
    public static void main(String[] argv)
                  throws java.io.IOException, TimeoutException {
 
            ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
            Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
 
        // 多个消息使用空格分隔
        Scanner sc = new Scanner(System.in);
        String[] splits = sc.nextLine().split(" ");
        for (int i = 0; i < splits.length; i++) {
                 channel.basicPublish(EXCHANGE_NAME, splits[i], null, splits[i].getBytes());
             System.out.println(" [x] Sent '" + splits[i] + "'");
        }
 
        channel.close();
        connection.close();
    }
}

View Code

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import rabbitMQ.RabbitMQTestUtil;

public class ReceiveLogsDebug {

    private static final String EXCHANGE_NAME = "direct_logs";
     
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();
     
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "debug");
     
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
}

View Code

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import rabbitMQ.RabbitMQTestUtil;

public class ReceiveLogsError {

    private static final String EXCHANGE_NAME = "direct_logs";
     
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();
     
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
     
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
}

View Code

发送输入:

debug接受:

error接受:

 

Topic exchange

发送到topic exchange中的消息不能有一个任意的routing_key——它必须是一个使用点分隔的单词列表。单词可以是任意的。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。

routing key的长度限制为255个字节数。

binding key也必须是相同的形式。topic exchange背后的逻辑类似于direct——一条使用特定的routing key发送的消息将会被传递至所有使用与该routing key相同的binding key进行绑定的队列中。 然而,对binding key来说有两种特殊的情况:

  1. *(star)可以代替任意一个单词
  2. #(hash)可以代替0个或多个单词

和Direct exchange差不多,代码就不copy了,有兴趣的直接看看教程http://www.rabbitmq.com/tutorials/tutorial-five-java.html

 

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
span.s1 { color: #7e504f }
span.s2 { color: #931a68 }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
span.s1 { color: #7e504f }
span.s2 { color: #931a68 }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco; color: #0326cc }

版权声明:本文为yxy-ngu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/yxy-ngu/p/8883379.html