最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:

  1. 临时异常,如数据库网络闪断、http请求临时失效等;

  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;

  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;

  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;

  5. 非法异常,一些伪造、攻击类型的消息。

 

  针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。

 

 

 

  1. 消息均使用Exchange进行通讯,方式可以是direct或topic,不建议fanout。

  2. 根据业务在Exchange下分配一个或多个Queue,同时设置一个审计线程(Audit)监听所有Queue,用于记录消息到MongoDB,同时又不阻塞正常业务处理

  3. 生产者(Publisher)在发布消息时,基于AMQP协议,生成消息标识MessageId和时间戳Timestamp,根据消息业务添加头信息Headers便于跟踪。

  

  4. 消费者(Comsumer)消息处理失败时,则把消息发送到重试交换机(Retry Exchange),并设置过期(重试)时间及更新重试次数;如果超过重试次数则删除消息。

  5. 重试交换机Exchange设置死信交换机(Dead Letter Exchange),消息过期后自动转发到业务交换机(Exchange)。

  6. WebApi可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。

   

  注:选择MongoDB作为存储介质的主要原因是其对头信息(headers)的动态查询支持较好,同等的替代产品还可以是Elastic Search这些。

  1. 设置断线自动恢复

  1.   var factory = new ConnectionFactory
  2.   {
  3.   Uri = new Uri("amqp://guest:guest@192.168.132.137:5672"),
  4.   AutomaticRecoveryEnabled = true
  5.   };

 

  2. 定义Exchange,模式为direct

  1.   channel.ExchangeDeclare("Exchange", "direct");

 

  3. 根据业务定义QueueA和QueueB

  1.   channel.QueueDeclare("QueueA", true, false, false);
  2.   channel.QueueBind("QueueA", "Exchange", "RouteA");
  3.   channel.QueueDeclare("QueueB", true, false, false);
  4.   channel.QueueBind("QueueB", "Exchange", "RouteB");

 

  4. 启动消息发送确认机制,即需要收到RabbitMQ服务端的确认消息

  1.   channel.ConfirmSelect();

 

  5. 设置消息持久化

  1.   var properties = channel.CreateBasicProperties();
  2.   properties.Persistent = true;

 

  6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers

  1.   properties.MessageId = Guid.NewGuid().ToString("N");
  2.   properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
  3.   properties.Headers = new Dictionary<string, object>
  4.   {
  5.   { "key", "value" + i}
  6.   };

 

  7. 发送消息,偶数序列发送到QueueA(RouteA),奇数序列发送到QueueB(RouteB)

  1.   channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);

 

  8. 确定收到RabbitMQ服务端的确认消息

  1.   var isOk = channel.WaitForConfirms();
  2.   if (!isOk)
  3.   {
  4.   throw new Exception("The message is not reached to the server!");
  5.   }

 

  完整代码

  1. var factory = new ConnectionFactory
  2. {
  3. Uri = new Uri("amqp://guest:guest@localhost:5672"),
  4. AutomaticRecoveryEnabled = true
  5. };
  6. using (var connection = factory.CreateConnection())
  7. {
  8. using (var channel = connection.CreateModel())
  9. {
  10. channel.ExchangeDeclare("Exchange", "direct");
  11. channel.QueueDeclare("QueueA", true, false, false);
  12. channel.QueueBind("QueueA", "Exchange", "RouteA");
  13. channel.QueueDeclare("QueueB", true, false, false);
  14. channel.QueueBind("QueueB", "Exchange", "RouteB");
  15. channel.ConfirmSelect();
  16. for (var i = 0; i < 2; i++)
  17. {
  18. var properties = channel.CreateBasicProperties();
  19. properties.Persistent = true;
  20. properties.MessageId = Guid.NewGuid().ToString("N");
  21. properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
  22. properties.Headers = new Dictionary<string, object>
  23. {
  24. { "key", "value" + i}
  25. };
  26. var message = "Hello " + i;
  27. var body = Encoding.UTF8.GetBytes(message);
  28. channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);
  29. var isOk = channel.WaitForConfirms();
  30. if (!isOk)
  31. {
  32. throw new Exception("The message is not reached to the server!");
  33. }
  34. }
  35. }
  36. }

View Code

 

  效果:QueueA和QueueB各一条消息,QueueAudit两条消息

 

   注:Exchange下必须先声明Queue才能接收到消息,上述代码并没有QueueAudit的声明;需要手动声明,或者先执行下面的消费者程序进行声明。

 

  1. 设置预取消息,避免公平轮训问题,可以根据需要设置预取消息数,这里是1

  1.   _channel.BasicQos(0, 1, false);

  

 

  2. 声明Exchange和Queue

  1.   _channel.ExchangeDeclare("Exchange", "direct");
  2.   _channel.QueueDeclare("QueueA", true, false, false);
  3.   _channel.QueueBind("QueueA", "Exchange", "RouteA");

 

  3. 编写回调函数

  1.   var consumer = new EventingBasicConsumer(_channel);
  2.   consumer.Received += (model, ea) =>
  3.   {
  4.   //The QueueA is always successful.
  5.   try
  6.   {
  7.   _channel.BasicAck(ea.DeliveryTag, false);
  8.   }
  9.   catch (AlreadyClosedException ex)
  10.   {
  11.   _logger.LogCritical(ex, "RabbitMQ is closed!");
  12.   }
  13.   };
  14.   _channel.BasicConsume("QueueA", false, consumer);

  注:设置了RabbitMQ的断线恢复机制,当RabbitMQ连接不可用时,与MQ通讯的操作会抛出AlreadyClosedException的异常,导致主线程退出,哪怕连接恢复了,程序也无法恢复,因此,需要捕获处理该异常。

 

  1. 设置预取消息

  1.   _channel.BasicQos(0, 1, false);

 

  2. 声明Exchange和Queue

  1.   _channel.ExchangeDeclare("Exchange", "direct");
  2.   _channel.QueueDeclare("QueueB", true, false, false);
  3.   _channel.QueueBind("QueueB", "Exchange", "RouteB");

 

  3.  设置死信交换机(Dead Letter Exchange)

  1.   var retryDic = new Dictionary<string, object>
  2.   {
  3.   {"x-dead-letter-exchange", "Exchange"},
  4.   {"x-dead-letter-routing-key", "RouteB"}
  5.   };
  6.   _channel.ExchangeDeclare("Exchange_Retry", "direct");
  7.   _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);
  8.   _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");

 

  4. 重试设置,3次重试;第一次1秒,第二次10秒,第三次30秒

  1.   _retryTime = new List<int>
  2.   {
  3.   1 * 1000,
  4.   10 * 1000,
  5.   30 * 1000
  6.   };

 

  5. 获取当前重试次数

  1.   var retryCount = 0;
  2.   if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount"))
  3.   {
  4.   retryCount = (int)ea.BasicProperties.Headers["retryCount"];
  5.   _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");
  6.   }

 

  6. 发生异常,判断是否可以重试

  1.   private bool CanRetry(int retryCount)
  2.   {
  3.   return retryCount <= _retryTime.Count - 1;
  4.   }

 

  7. 可以重试,则启动重试机制

  1.   private void SetupRetry(int retryCount, string retryExchange, string retryRoute, BasicDeliverEventArgs ea)
  2.   {
  3.   var body = ea.Body;
  4.   var properties = ea.BasicProperties;
  5.   properties.Headers = properties.Headers ?? new Dictionary<string, object>();
  6.   properties.Headers["retryCount"] = retryCount;
  7.   properties.Expiration = _retryTime[retryCount].ToString();
  8.   try
  9.   {
  10.   _channel.BasicPublish(retryExchange, retryRoute, properties, body);
  11.   }
  12.   catch (AlreadyClosedException ex)
  13.   {
  14.   _logger.LogCritical(ex, "RabbitMQ is closed!");
  15.   }
  16.   }

 

  完整代码

  1. _channel.BasicQos(0, 1, false);
  2. _channel.ExchangeDeclare("Exchange", "direct");
  3. _channel.QueueDeclare("QueueB", true, false, false);
  4. _channel.QueueBind("QueueB", "Exchange", "RouteB");
  5. var retryDic = new Dictionary<string, object>
  6. {
  7. {"x-dead-letter-exchange", "Exchange"},
  8. {"x-dead-letter-routing-key", "RouteB"}
  9. };
  10. _channel.ExchangeDeclare("Exchange_Retry", "direct");
  11. _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);
  12. _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");
  13. var consumer = new EventingBasicConsumer(_channel);
  14. consumer.Received += (model, ea) =>
  15. {
  16. //The QueueB is always failed.
  17. bool canAck;
  18. var retryCount = 0;
  19. if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount"))
  20. {
  21. retryCount = (int)ea.BasicProperties.Headers["retryCount"];
  22. _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");
  23. }
  24. try
  25. {
  26. Handle();
  27. canAck = true;
  28. }
  29. catch (Exception ex)
  30. {
  31. _logger.LogCritical(ex, "Error!");
  32. if (CanRetry(retryCount))
  33. {
  34. SetupRetry(retryCount, "Exchange_Retry", "RouteB_Retry", ea);
  35. canAck = true;
  36. }
  37. else
  38. {
  39. canAck = false;
  40. }
  41. }
  42. try
  43. {
  44. if (canAck)
  45. {
  46. _channel.BasicAck(ea.DeliveryTag, false);
  47. }
  48. else
  49. {
  50. _channel.BasicNack(ea.DeliveryTag, false, false);
  51. }
  52. }
  53. catch (AlreadyClosedException ex)
  54. {
  55. _logger.LogCritical(ex, "RabbitMQ is closed!");
  56. }
  57. };
  58. _channel.BasicConsume("QueueB", false, consumer);

View Code

 

  1. 声明Exchange和Queue

  1.   _channel.ExchangeDeclare("Exchange", "direct");
  2.   _channel.QueueDeclare("QueueAudit", true, false, false);
  3.   _channel.QueueBind("QueueAudit", "Exchange", "RouteA");
  4.   _channel.QueueBind("QueueAudit", "Exchange", "RouteB");

 

  2. 排除死信Exchange转发过来的重复消息

  1.   if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death"))
  2.   {
  3.   ...
  4.   }

 

  3. 生成消息实体

  1.   var message = new Message
  2.   {
  3.   MessageId = ea.BasicProperties.MessageId,
  4.   Body = ea.Body,
  5.   Exchange = ea.Exchange,
  6.   Route = ea.RoutingKey
  7.   };

 

  4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串

  1.   if (ea.BasicProperties.Headers != null)
  2.   {
  3.   var headers = new Dictionary<string, object>();
  4.   foreach (var header in ea.BasicProperties.Headers)
  5.   {
  6.   if (header.Value is byte[] bytes)
  7.   {
  8.   headers[header.Key] = Encoding.UTF8.GetString(bytes);
  9.   }
  10.   else
  11.   {
  12.   headers[header.Key] = header.Value;
  13.   }
  14.   }
  15.   message.Headers = headers;
  16.   }

 

  5. 把Unix格式的Timestamp转成UTC时间

  1.   if (ea.BasicProperties.Timestamp.UnixTime > 0)
  2.   {
  3.   message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;
  4.   var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);
  5.   message.Timestamp = offset.UtcDateTime;
  6.   }

 

  6. 消息存入MongoDB

  1.   _mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);

 

  MongoDB记录:

  

 

  重试记录:

  

 

  1. 通过消息Id检索消息

  

 

  2. 通过头消息检索消息

  

  

 

  3. 消息重发,会重新生成MessageId

  

  

 

  1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。

  2. 消息处理失败,执行Nack或者Reject:

  a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;

  b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;

  c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。

 

  3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。

  

  1. 重连(Reconnect)

  2. 恢复连接监听(Listeners)

  3. 重新打开通道(Channels)

  4. 恢复通道监听(Listeners)

  5. 恢复basic.qos,publisher confirms以及transaction设置

   

  1. 重新声明交换机(Exchanges)

  2. 重新声明队列(Queues)

  3. 恢复所有绑定(Bindings)

  4. 恢复所有消费者(Consumers)

 

  1. 临时异常,如数据库网络闪断、http请求临时失效等

  通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。

 

  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行

  通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。

  

  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理

  等系统修正后,通过消息重发的方式处理。

 

  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等

  等系统恢复后,通过消息重发的方式处理。

 

  5. 非法异常,一些伪造、攻击类型的消息

  多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。

 

https://github.com/ErikXu/RabbitMesage

版权声明:本文为Erik_Xu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/Erik_Xu/p/9515208.html