RabbitMQ基础入门
RabbitMQ是一个消息中间件,在一些需要异步处理、发布/订阅等场景的时候,使用RabbitMQ可以完成我们的需求。 下面是我在学习java语言实现RabbitMQ(自RabbitMQ官网的Tutorials)的一些记录。
首先有三个名称了解一下(以下图片来自rabbitMQ官网)
-
producer
是用户应用负责发送消息
-
queue
是存储消息的缓冲(buffer)
-
consumer
是用户应用负责接收消息
下面是我使用rabbitMQ原生的jar包做的测试方法
maven pom.xml 加入
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco; color: #4e9192 }
p.p2 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
span.s1 { color: #009193 }
span.s2 { color: #4e9192 }
span.s3 { color: #000000 }
span.s4 { text-decoration: underline; color: #000000 }
span.Apple-tab-span { white-space: pre }
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.6</version>
</dependency>
方法实现示意图
发送消息方法(Send.java)
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 5 public class Send { 6 7 private static final String QUEUE_NAME = "hello"; 8 9 public static void main(String[] args) throws Exception { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("192.168.1.7"); 12 factory.setPort(5672); 13 factory.setUsername("admin"); 14 factory.setPassword("admin"); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 19 String message = "Hello World!"; 20 // "" 表示默认exchange 21 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 22 System.out.println(" [x] Sent '" + message + "'"); 23 24 channel.close(); 25 connection.close(); 26 } 27 28 }
10~16行 是获取rabbitmq.client.Channel, rabbitMQ的API操作基本都是通过channel来完成的。
18行 channel.queueDeclare(QUEUE_NAME, false, false, false, null),这里channel声明了一个名字叫“hello”的queue,声明queue的操作是幂等的,也就是说只有不存在相同名称的queue的情况下才会创建一个新的queue。
21行 channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes()),chaneel在这个queue里发布了消息(字节数组)。
24~25行 则是链接的关闭,注意关闭顺序就好了。
接受消息方法 (Recv.java)
1 import com.rabbitmq.client.AMQP; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 9 import java.io.IOException; 10 11 public class Recv { 12 13 private final static String QUEUE_NAME = "hello"; 14 15 public static void main(String[] argv) throws Exception { 16 ConnectionFactory factory = new ConnectionFactory(); 17 factory.setHost("192.168.1.7"); 18 factory.setPort(5672); 19 factory.setUsername("admin"); 20 factory.setPassword("admin"); 21 Connection connection = factory.newConnection(); 22 Channel channel = connection.createChannel(); 23 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 26 27 Consumer consumer = new DefaultConsumer(channel) { 28 @Override 29 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 30 throws IOException { 31 String message = new String(body, "UTF-8"); 32 System.out.println(" [x] Received '" + message + "'"); 33 } 34 }; 35 channel.basicConsume(QUEUE_NAME, true, consumer); 36 } 37 }
16~22行 和Send类中一样,也是获取同一个rabbitMQ服务的channel,这也是能接受到消息的基础。
24行 同样声明了一个和Send类中发布的queue相同的queue。
27~35行 DefaultConsumer
类实现了Consumer
接口,由于推送消息是异步的,因此在这里提供了一个callback来缓冲接受到的消息。
先运行Recv 然后再运行Send,就可以看到消息被接受输出到控制台了,如果多启动几个Recv,会发现消息被每个消费者按顺序分别消费了,
这也就是rabbitMQ默认采用Round-robin dispatching(轮询分发机制)。
Work queues
上面简单的实现了rabbitMQ消息的发送和接受,但是无论Send类中的queueDeclare 、basicPublish方法还有Recv类中的basicConsume方法都有很多的参数,
下面我们分析一下几个重要的参数。
(一)Message acknowledgment 消息答复
上面Recv.java的第35行中,channel.basicConsume(QUEUE_NAME, true, consumer),
在Channel接口中定义为 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
这个autoAck我们当前实现为true,表示服务器会自动确认ack,一旦RabbitMQ将一个消息传递到consumer,它马上会被标记为删除状态。
这样如果consumer在正常执行任务过程中,一旦consumer服务挂了,那么我们就永远的失去了这个consumer正在处理的所有消息。
为了防止这种情况,rabbitMQ支持Message acknowledgment,当消息被一个consumer接受并处理完成后,consumer发送给rabbitMQ一个回执,然后rabbitMQ才会删除这个消息。
当一个消息挂了,rabbitMQ会给另外可用的consumer继续发送上个consumer因为挂了而没有处理成功的消息。
因此我们可以设置autoAck=false,来显示的让服务端做消息成功执行的确认。
(二)Message durability 消息持久化
Message acknowledgment 确保了consumer挂了的情况下,消息还可以被其他consumer接受处理,但是如果rabbitMQ挂了呢?
在声明队列的方法中,Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
durable=true 意味着该队列将在服务器重启后继续存在。Send和Recv两个类中声明队列的方法都要设置durable=true。
现在,我们需要将消息标记为持久性——通过将MessageProperties(它实现BasicProperties)设置为PERSISTENT_TEXT_PLAIN
(三)Fair dispatch 公平分发
rabbitMQ默认是轮询分发,这样对多个consumer而言,可能就会出现负载不均衡的问题,无论是任务本身难易度,还是consumer处理能力的不同,都是导致这种问题。
为了处理这种情况我们可以使用basicQos
方法来设置prefetchCount = 1
。 这告诉rabbitMQ一次只给consumer一条消息,换句话来说,就是直到consumer发回ack,然后再向这个consumer发送下一条消息。
int
prefetchCount =
1
;
channel.basicQos(prefetchCount);
正是因为Fair dispatch是基于ack的,所有它最好和Message acknowledgment同时使用,否则在autoAck=true的情况下,单独设置Fair dispatch并没有效果。
下面是本人测试以上三种情况的测试代码,可以直接使用。
import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = true; //消息持久化 channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 多个消息使用空格分隔 Scanner sc = new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // basicQos方法来设置prefetchCount = 1。 这告诉RabbitMQy一次只给worker一条消息,换句话来说,就是直到worker发回ack,然后再向这个worker发送下一条消息。 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 当consumer确认收到某个消息,并且已经处理完成,RabbitMQ可以删除它时,consumer会向RabbitMQ发送一个ack(nowledgement)。 boolean autoAck = true; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } protected static void doWork(String message) throws InterruptedException { for (char ch: message.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
发布/订阅(Publish/Subscribe)
一个完整的rabbitMQ消息模型是会有Exchange的。
rabbitMQ的消息模型的核心思想是producer永远不会直接发送任何消息到queue中,实际上,在很多情况下producer根本不知道一条消息是否被发送到了哪个queue中。
在rabbitMQ中,producer仅仅将消息发送到一个exchange中。要理解exchange也非常简单,它一边负责接收producer发送的消息, 另一边将消息推送到queue中。
exchange必须清楚的知道在收到消息之后该如何进行下一步的处理,比如是否应该将这条消息发送到某个queue中? 还是应该发送到多个queue中?还是应该直接丢弃这条消息等。
exchange模型如下:
exchange类型也有好几种:direct
,topic
,headers
以及fanout。
Fanout exchange
下面我们来创建一个fanout
类型的exchange,顾名思义,fanout会向所有的queue广播所有收到的消息。
1 import java.io.IOException; 2 import java.util.Scanner; 3 import java.util.concurrent.TimeoutException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 import rabbitMQ.RabbitMQTestUtil; 10 11 public class EmitLog { 12 13 private static final String EXCHANGE_NAME = "logs"; 14 15 public static void main(String[] argv) throws IOException, TimeoutException { 16 17 ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 22 23 // 多个消息使用空格分隔 24 Scanner sc = new Scanner(System.in); 25 String[] splits = sc.nextLine().split(" "); 26 for (int i = 0; i < splits.length; i++) { 27 channel.basicPublish(EXCHANGE_NAME, "", null, splits[i].getBytes()); 28 System.out.println(" [x] Sent '" + splits[i] + "'"); 29 } 30 31 channel.close(); 32 connection.close(); 33 } 34 }
1 import java.io.IOException; 2 3 import com.rabbitmq.client.AMQP; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 import com.rabbitmq.client.Consumer; 8 import com.rabbitmq.client.DefaultConsumer; 9 import com.rabbitmq.client.Envelope; 10 11 import rabbitMQ.RabbitMQTestUtil; 12 13 public class ReceiveLogs { 14 15 private static final String EXCHANGE_NAME = "logs"; 16 17 public static void main(String[] argv) throws Exception { 18 ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); 19 Connection connection = factory.newConnection(); 20 Channel channel = connection.createChannel(); 21 22 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 23 String queueName = channel.queueDeclare().getQueue(); 24 channel.queueBind(queueName, EXCHANGE_NAME, ""); 25 26 Consumer consumer = new DefaultConsumer(channel) { 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, 29 AMQP.BasicProperties properties, byte[] body) throws IOException { 30 String message = new String(body, "UTF-8"); 31 System.out.println(" [x] Received '" + message + "'"); 32 } 33 }; 34 channel.basicConsume(queueName, true, consumer); 35 } 36 }
Direct exchange
在fanout的exchange类型中,消息的发布已经队列的绑定方法中,routingKey参数都是默认空值,因为fanout类型会直接忽略这个值,
但是在其他exchange类型中它拥有很重要的意义,
rabbitMQ支持以上两种绑定,消息在发布的时候,会指定一个routing key,而图一中exchange会把routing key为orange
发送的消息将会被路由到queue Q1
中,使用routing key为black
或者green
的将会被路由到Q2
中。
将多个queue使用相同的binding key进行绑定也是可行的。可以在X和Q1中间增加一个routing key black
。 它会向所有匹配的queue进行广播,使用routing key为black
发送的消息将会同时被Q1
和Q2
接收。
下面是我测试debug和error两种routing key发布消息并接受处理消息的代码:
import java.util.Scanner; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitMQ.RabbitMQTestUtil; public class EmitLog { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, TimeoutException { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 多个消息使用空格分隔 Scanner sc = new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish(EXCHANGE_NAME, splits[i], null, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }
View Code
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import rabbitMQ.RabbitMQTestUtil; public class ReceiveLogsDebug { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "debug"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
View Code
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import rabbitMQ.RabbitMQTestUtil; public class ReceiveLogsError { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
View Code
发送输入:
debug接受:
error接受:
Topic exchange
发送到topic exchange中的消息不能有一个任意的routing_key
——它必须是一个使用点分隔的单词列表。单词可以是任意的。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。
routing key的长度限制为255个字节数。
binding key也必须是相同的形式。topic exchange背后的逻辑类似于direct——一条使用特定的routing key发送的消息将会被传递至所有使用与该routing key相同的binding key进行绑定的队列中。 然而,对binding key来说有两种特殊的情况:
- *(star)可以代替任意一个单词
- #(hash)可以代替0个或多个单词
和Direct exchange差不多,代码就不copy了,有兴趣的直接看看教程http://www.rabbitmq.com/tutorials/tutorial-five-java.html
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
span.s1 { color: #7e504f }
span.s2 { color: #931a68 }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco }
span.s1 { color: #7e504f }
span.s2 { color: #931a68 }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Monaco; color: #0326cc }