工作队列work queues 公平分发(fair dispatch) And 消息应答与消息持久化
生产者
1 package cn.wh.work; 2 3 import cn.wh.util.RabbitMqConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public class Send { 8 private static final String QUEVE_NAME = "test_work_queue"; 9 10 public static void main(String[] args) throws Exception { 11 ; 12 Connection connection = RabbitMqConnectionUtil.getConnection(); 13 Channel channel = connection.createChannel(); 14 channel.queueDeclare(QUEVE_NAME, false, false, false, null); 15 16 int i1 =1 ; 17 channel.basicQos(i1); 18 for (int i = 0; i < 50; i++) { 19 String msg = "hello " + i; 20 System.out.println(msg); 21 channel.basicPublish("", QUEVE_NAME, null, msg.getBytes()); 22 Thread.sleep(i * 20); 23 } 24 channel.close(); 25 connection.close(); 26 } 27 }
消费者 1
1 package cn.wh.work; 2 3 import cn.wh.util.RabbitMqConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class Recv1 { 9 private static final String QUEVE_NAME = "test_work_queue"; 10 public static void main(String[] args) throws Exception { 11 Connection connection = RabbitMqConnectionUtil.getConnection(); 12 final Channel channel = connection.createChannel(); 13 int i1 =1 ; 14 channel.basicQos(i1); 15 DefaultConsumer consumer = new DefaultConsumer(channel) { 16 @Override 17 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 18 String msg = new String(body); 19 System.out.println("recv1"+msg); 20 try { 21 Thread.sleep(2000); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 }finally { 25 System.out.println(1+"OK"); 26 channel.basicAck(envelope.getDeliveryTag(),false); 27 } 28 } 29 }; 30 boolean autoAck=false; 31 channel.basicConsume(QUEVE_NAME,autoAck,consumer); 32 } 33 }
消费者2
1 package cn.wh.work; 2 3 import cn.wh.util.RabbitMqConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class Recv2 { 9 10 private static final String QUEVE_NAME = "test_work_queue"; 11 public static void main(String[] args) throws Exception { 12 Connection connection = RabbitMqConnectionUtil.getConnection(); 13 final Channel channel = connection.createChannel(); 14 int i1 =1 ; 15 channel.basicQos(i1); 16 DefaultConsumer consumer = new DefaultConsumer(channel) { 17 @Override 18 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 19 String msg = new String(body); 20 System.out.println("recv2"+msg); 21 22 try { 23 Thread.sleep(1000); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 }finally { 27 System.out.println(2+"OK"); 28 channel.basicAck(envelope.getDeliveryTag(),false); 29 } 30 } 31 }; 32 boolean autoAck=false; 33 channel.basicConsume(QUEVE_NAME,autoAck,consumer); 34 } 35 36 }
这时候现象就是消费者 1 速度大于消费者
Message acknowledgment(消息应答)
- boolean autoAck = true;(自动确认模式)一旦 RabbitMQ 将消息分发给了消费者,就会从内存中删除。在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息。
- boolean autoAck = false; (手动确认模式) 我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。 为了确保消息不会丢失,RabbitMQ 支持消息应答。消费者送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了。RabbitMQ 可以删除它了。
- 消息应答是默认打开的。也就是 boolean autoAck =false
Message durability(消息持久化)
我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍将失去!当 RabbitMQ 退出或者崩溃,将会丢失队列和消息。除非你不要队列和消息。两件事儿必须保证消息不被丢失:我们必须把“队列”和“消息”设为持久化。
1. boolean durable = true;
2. channel.queueDeclare(“test_queue_work”, durable, false, false, null);那么我们直接将程序里面的 false 改成 true 就行了?? 不可以会 报异常 channel error; protocol method: #method<channel.close>(reply-code=406, replytext=PRECONDITION_FAILED – inequivalent arg \’durable\’ for queue \’test_queue_work\’
尽管这行代码是正确的,他不会运行成功。因为我们已经定义了一个名叫 test_queue_work 的未持久化的队列。RabbitMQ 不允许使用不同的参数设定重新定义已经存在的队列,并且会返回一个错误。一个快速的解决方案——就是声明一个不同名字的队列,比如 task_queue。或者我们登录控制台将队列删除就可