当消息在一个队列中变成一个死信之后,它将被重新publish到另一个交换机上,这个交换机我们就叫做死信交换机,私信交换机将死信投递到一个队列上就是死信队列。具体原理如下图:

死信交换机.png

消息变成死信的三种情况:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

应答模式分为两种,手动签收和自动签收,自动应答就是消费者消费了一条消息就自动告诉队列删除消息。这样的弊端就是不管消费逻辑有没有成功,都会将消息删除,这样就会造成消息丢失。而使用手动签收后,就是在消费逻辑处理成功后,手动告诉队列消费成功,然后队列再去删除这条消息。

  1. 再消费者配置文件中开启手动签收模式
  1. spring.rabbitmq.listener.simple.acknowledge-mode = manual
  1. 在消费逻辑处理成功后手动签收,修改消费者代码
  1. @RabbitListener(queues = QUEUE_NAME)
  2. public void receiveMessage(Message message,@Headers Map<String,Object> headers, Channel channel) throws Exception {
  3. Jedis jedis = new Jedis("localhost", 6379);
  4. String messageId = message.getMessageProperties().getMessageId();
  5. String msg = new String(message.getBody(),"UTF-8");
  6. System.out.println("接收导的消息为:"+msg+"==消息id为:"+messageId);
  7. String messageIdRedis = jedis.get("messageId");
  8. if(messageId == messageIdRedis){
  9. return;
  10. }
  11. JSONObject jsonObject = JSONObject.parseObject(msg);
  12. String email = jsonObject.getString("email");
  13. String content = jsonObject.getString("timestamp");
  14. String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;
  15. // 如果发生异常则返回null
  16. String body = HttpUtils.httpGet(httpUrl, "utf-8");
  17. //
  18. if(body == null){
  19. throw new Exception();
  20. }
  21. jedis.set("messageId",messageId);
  22. Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
  23. // 手动签收
  24. channel.basicAck(deliveryTag,false);
  25. }
  • 消息被拒绝

我们继续修改一下消费者代码,尝试让消费者消费的时候发生异常。然后在catch块中拒绝消息。

  1. // 拒绝消息,给死信队列
  2. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

我们运行程序后发现,**当消息消费异常后在队列”zb-byte1“中的消息被消费了,同时发现在死信队列”dead-byte-zb“中有一条未被消费的消息。**消息到死信队列后,然后我们在创建一个消费者去消费消息就可以了。当然死信队列也需要去手动签收消息。

  • 消息TTL过期

这种模式我们也叫做延迟消费,有一种特别经典的案例就是用户在一个商品抢购系统中,用户抢到商品后需要在30分钟时间内支付,不然订单无效。这时候我们就可以通过消息TTL过期来实现,设置队列消息过期时间为30分钟,30分钟后publish到死信队列,我们在死信队列中消费订单状态是否支付成功来判断该订单是否有效。

非常简单,我们只需要在配置死信交换机的时候设置有效时间就可以了

  1. @Bean
  2. public Queue queue(){
  3. Map<String,Object> map = new HashMap<>();
  4. map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
  5. map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
  6. map.put("x-message-ttl",7200); // 队列过期时间
  7. Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
  8. return queue;
  9. }
  • 队列达到最大长度

设置队列长度即可:

  1. @Bean
  2. public Queue queue(){
  3. Map<String,Object> map = new HashMap<>();
  4. map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
  5. map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
  6. map.put("x-max-length",3);
  7. // map.put("x-message-ttl",7200); // 队列过期时间
  8. Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
  9. return queue;
  10. }

设置好之后,我们先不要启动消费者,然后调用生成者往队列中发送消息,当消息长度大于3时,我们发现消息进入了死信队列。

注意:前文中也提到过,队列不能被修改,也就是说已经创建好的队列设置了过期时常为7200s,然后我们注释掉,增加队列长度是3的代码,这样运行会报错,必须在rabbitmq中将该队列删除,然后重新生成队列才可以。

如果文章对您有帮助,请记得点赞关注哟~
欢迎大家关注我的公众号:字节传说,每日推送技术文章供大家学习参考。

版权声明:本文为zhixie原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/zhixie/p/12550293.html