官网也讲述的清楚而且还有例子,只不过是英文的,很多人看到英文就不明白说什么了吧,即使有翻译成中文,总觉得哪里怪怪的,有些翻译并不流畅。我还是支持多看官网,官网:https://www.rabbitmq.com/ 。下面是自己做的一点下笔记,有参考其他文档。如有什么不对的地方,希望大家能够告诉我,通过留言板,像消息队列一样,你发送消息,我接收消息。

  • 什么是消息队列:即MQ,Message Queue。是一种应用程序对应用对应用程序的通信方式。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来连接他们。消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断从队列中获取消息。【因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦
  • AMQP和JMS

    • AMQP:即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准(高级消息队列协议)。【限定了数据传输的格式和方式,跨语言跨平台,和http协议类似】。是应用层协议的一个开放标准,为面向消息的中间件设计。
    • JMS:Java MessageService,实际上指JMS API。JMS是SUN公司早期提出的消息标准,旨在为Java应用提供统一的消息操作,包括create、send、recieve等。JMS已经成为java Enterprise Edition的一部分。
    • 两者间的区别和联系:

      • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式。
      • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
      • JMS规定了两种消息模型;而AMQP的消息模型更加丰富。
  • 常见MQ产品:
    • ActiveMQ:基于JMS, Apache旗下的
    • RabbitMQ:基于AMQP协议,erlang(一种通用的面向并发的编程语言)语言开发,稳定性好
    • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
    • Kafka:分布式消息系统,高吞吐量
  • 下载安装:先安装erlang,在安装rabbitmq。(这里不详细说明,如果有不是很清楚的,可以去搜索,相信很多伙伴已经分享过很多这样的博客啦)
  • 启动步骤【window系统下的cmd命令行】:cd \RabbitMQ Server\rabbitmq_server-3.7.15\sbin(进入到你安装的rabbitmq目录的sbin目录) –> rabbitmq-plugins enable rabbitmq_management –> rabbitmq-server
  • 五种消息模型:RabbitMQ提供了6中消息模型,但是第6种RPC,并不是MQ。其中3/4/5这三种属于订阅模式,只不过进行路由的方式不同。

    • 基本消息模型:RabbitMQ是一个消息的代理者(Message broker):它接收消息并且传递消息。你可以认为它是一个邮局,当你投递邮件到一个邮箱,你很肯定邮递员终究会将邮件递交给你的收件人。与此类似,RabbitMQ可以是一个邮箱、邮局、同时还是邮递员。不同之处在于:RabbitMQ不是传递纸质邮件,而是二进制的数据。

      •   问题:那么RabbitMQ怎么知道消息被接收了呢?(转换思维,即如何避免消息的丢失?)【答:消费者的消息确认机制Acknowlege,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:① 自动ACK:消息一旦被接收,消费者自动发送ACK;② 手动ACK:消息接收后,不会发送ACK,需要手动调用。很好理解,望名知意嘛,嘿嘿嘿!!!】。
        1. public class ConnectionUtil {
        2. /**
        3. * 建立与RabbitMQ的链接
        4. */
        5. public static Connection getConnection() throws IOException, TimeoutException {
        6. // 定义连接工厂
        7. ConnectionFactory factory = new ConnectionFactory();
        8. // 设置服务地址
        9. factory.setHost("127.0.0.1");
        10. // 端口
        11. factory.setPort(5672);
        12. // 设置账号信息,用户名、密码、vhost
        13. factory.setVirtualHost("/demo");
        14. factory.setUsername("guest");
        15. factory.setPassword("guest");
        16. // 通过工厂获取连接
        17. Connection connection = factory.newConnection();
        18. return connection;
        19. }
        20. }
        1. public class Send {
        2. private final static String QUEUE_NAME = "simple_queue";
        3. public static void main(String[] args) throws IOException, TimeoutException {
        4. // 获取到连接
        5. Connection connection = ConnectionUtil.getConnection();
        6. // 从连接中创建通道,使用通道才能完成消息相关的操作
        7. Channel channel = connection.createChannel();
        8. // 声明(创建)队列
        9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        10. // 消息内容
        11. String message = "Hello word!";
        12. // 向指定的队列中发送消息
        13. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        14. System.out.println(" [x] Sent '" + message + "'");
        15. // 关闭通道和连接
        16. channel.close();
        17. connection.close();
        18. }
        19. }
        1. public class Recv {
        2. private final static String QUEUE_NAME = "simple_queue";
        3. public static void main(String[] args) throws Exception {
        4. // 获取连接
        5. Connection connection = ConnectionUtil.getConnection();
        6. // 创建通道
        7. Channel channel = connection.createChannel();
        8. // 声明队列
        9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        10. // 定义队列的消费者
        11. DefaultConsumer consumer = new DefaultConsumer(channel) {
        12. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用
        13. @Override
        14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
        15. byte[] body) throws IOException {
        16. // body 即消息体
        17. String msg = new String(body);
        18. System.out.println(" [x] received: " + msg + "!");
        19. }
        20. };
        21. // 监听队列,第二个参数:是否自动进行消息确认
        22. channel.basicConsume(QUEUE_NAME, true, consumer);
        23. }
        24. }
        1. public class Recv2 {
        2. private final static String QUEUE_NAME = "simple_queue";
        3. public static void main(String[] args) throws Exception {
        4. // 获取连接
        5. Connection connection = ConnectionUtil.getConnection();
        6. // 创建通道
        7. Channel channel = connection.createChannel();
        8. // 声明队列
        9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        10. // 定义队列的消费者
        11. DefaultConsumer consumer = new DefaultConsumer(channel) {
        12. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用
        13. @Override
        14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
        15. byte[] body) throws IOException {
        16. // body 即消息体
        17. String msg = new String(body);
        18. System.out.println(" [x] received: " + msg + "!");
        19. // 手动进行ACK
        20. channel.basicAck(envelope.getDeliveryTag(), false);
        21. }
        22. };
        23. // 监听队列,第二个参数false,手动进行ACK
        24. channel.basicConsume(QUEUE_NAME, false, consumer);
        25. }
        26. }
    • work消息模型:也称为Task queue任务模型。当消息处理比较耗时的时候,可能产生消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用该模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

      •   问题:又会存在疑惑,任务是如何分配的,可以和显示生活中任务的分配联想起来呦?【答案:①  默认是任务平分,一次分配完全(即公平分配);② channel.basicQos(int num); 设置每个消费者同时只能处理num条数据(即能者多劳,耗时小的多处理些,你懂滴)】。
        1. public class Send {
        2. private final static String QUEUE_NAME = "task_work_queue";
        3. public static void main(String[] args) throws Exception {
        4. // 获取到连接
        5. Connection connection = ConnectionUtil.getConnection();
        6. // 从连接中创建通道,使用通道才能完成消息相关的操作
        7. final Channel channel = connection.createChannel();
        8. // 声明(创建)队列
        9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        10. // 循环发布任务
        11. for (int i=0; i<50; i++) {
        12. // 消息内容
        13. String message = "task ... " + i;
        14. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        15. System.out.println(" [x] Sent '" + message + "'");
        16. Thread.sleep(i * 2);
        17. }
        18. // 关闭通道和连接
        19. channel.close();
        20. connection.close();
        21. }
        22. }
        1. public class Recv {
        2. private final static String QUEUE_NAME = "task_work_queue";
        3. public static void main(String[] args) throws Exception {
        4. // 获取连接
        5. Connection connection = ConnectionUtil.getConnection();
        6. // 创建通道
        7. Channel channel = connection.createChannel();
        8. // 声明队列
        9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        10. // 设置每个消费同时只能处理一条消息
        11. channel.basicQos(1);
        12. // 定义队列的消费者
        13. DefaultConsumer consumer = new DefaultConsumer(channel) {
        14. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用
        15. @Override
        16. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
        17. byte[] body) throws IOException {
        18. // body 即消息体
        19. String msg = new String(body);
        20. System.out.println(" [x] received: " + msg + "!");
        21. try {
        22. // 模拟完成任务的耗时:1000ms
        23. Thread.sleep(1000);
        24. } catch (InterruptedException e) {
        25. e.printStackTrace();
        26. }
        27. channel.basicAck(envelope.getDeliveryTag(), false);
        28. }
        29. };
        30. // 监听队列,第二个参数:是否自动进行消息确认
        31. channel.basicConsume(QUEUE_NAME, false, consumer);
        32. }
        33. }
        1. /*
        2. * 对比上个消费者:耗时小,完成任务多些
        3. */
        4. public class Recv2 {
        5. private final static String QUEUE_NAME = "task_work_queue";
        6. public static void main(String[] args) throws Exception {
        7. // 获取连接
        8. Connection connection = ConnectionUtil.getConnection();
        9. // 创建通道
        10. final Channel channel = connection.createChannel();
        11. // 声明队列
        12. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        13. // 设置每个消费同时只能处理一条消息
        14. channel.basicQos(1);
        15. // 定义队列的消费者
        16. DefaultConsumer consumer = new DefaultConsumer(channel) {
        17. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用
        18. @Override
        19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
        20. byte[] body) throws IOException {
        21. // body 即消息体
        22. String msg = new String(body);
        23. System.out.println(" [x] received: " + msg + "!");
        24. // 手动进行ACK
        25. channel.basicAck(envelope.getDeliveryTag(), false);
        26. }
        27. };
        28. // 监听队列,第二个参数false,手动进行ACK
        29. channel.basicConsume(QUEUE_NAME, false, consumer);
        30. }
        31. }
    • 订阅模型分类:

      • 订阅模型 – Fanout:广播。一条消息,会被所有订阅的队列消费。

          1. public class Send {
          2. private final static String EXCHANGE_NAME = "fanout_exchange_test";
          3. public static void main(String[] args) throws Exception {
          4. // 获取连接
          5. Connection connection = ConnectionUtil.getConnection();
          6. // 获取通道
          7. Channel channel = connection.createChannel();
          8. // 声明exchange,指定类型为fanout
          9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
          10. // 消息内容
          11. String message = "Hello everyone";
          12. // 发布消息到Exchange
          13. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
          14. System.out.println(" [生产者] Sent '" + message + "'");
          15. channel.close();
          16. connection.close();
          17. }
          18. }
          1. public class Recv {
          2. private final static String QUEUE_NAME = "fanout_exchange_queue_1";
          3. private final static String EXCHANGE_NAME = "fanout_exchange_test";
          4. public static void main(String[] args) throws IOException, TimeoutException {
          5. // 获取到链接
          6. Connection connection = ConnectionUtil.getConnection();
          7. // 获取通道
          8. Channel channel = connection.createChannel();
          9. // 声明队列
          10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
          11. // 绑定队列到交换机
          12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
          13. // 定义队列的消费者
          14. DefaultConsumer consumer = new DefaultConsumer(channel) {
          15. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用
          16. @Override
          17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          18. String msg = new String(body);
          19. System.out.println(" [消费者1] received: " + msg + "!");
          20. }
          21. };
          22. // 监听队列,自动返回完成
          23. channel.basicConsume(QUEUE_NAME, true, consumer);
          24. }
          25. }
          1. public class Recv2 {
          2. private final static String QUEUE_NAME = "fanout_exchange_queue_2";
          3. private final static String EXCHANGE_NAME = "fanout_exchange_test";
          4. public static void main(String[] args) throws IOException, TimeoutException {
          5. // 获取到链接
          6. Connection connection = ConnectionUtil.getConnection();
          7. // 获取通道
          8. Channel channel = connection.createChannel();
          9. // 声明队列
          10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
          11. // 绑定队列到交换机
          12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
          13. // 定义队列的消费者
          14. DefaultConsumer consumer = new DefaultConsumer(channel) {
          15. @Override
          16. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          17. String msg = new String(body);
          18. System.out.println(" [消费者2] received: " + msg + "!");
          19. }
          20. };
          21. // 监听队列,自动返回完成
          22. channel.basicConsume(QUEUE_NAME, true, consumer);
          23. }
          24. }
      • 订阅模型 – Direct:不同的消息被不同的队列消费。在Direct模型下:

        •   队列与交换机的绑定,不能是任意绑定,而是要指定至少一个 RoutingKey(路由key);
        •   消息的发送方向 Exchange 发送消息时,也必须指定消息的 RoutingKey;
        •   Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息。
          1. public class Send {
          2. private final static String EXCHANGE_NAME = "direct_exchange_test";
          3. public static void main(String[] args) throws Exception {
          4. // 获取连接
          5. Connection connection = ConnectionUtil.getConnection();
          6. // 获取通道
          7. Channel channel = connection.createChannel();
          8. // 声明exchange,指定类型为direct
          9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
          10. // 消息内容
          11. String message = "商品增加了,id = 1002";
          12. // 发布消息到Exchange,并且指定routing key为:delete,代表删除商品
          13. channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
          14. System.out.println(" [商品服务] Sent '" + message + "'");
          15. channel.close();
          16. connection.close();
          17. }
          18. }
          1. public class Recv {
          2. private final static String QUEUE_NAME = "direct_exchange_queue_1";
          3. private final static String EXCHANGE_NAME = "direct_exchange_test";
          4. public static void main(String[] args) throws IOException, TimeoutException {
          5. // 获取到链接
          6. Connection connection = ConnectionUtil.getConnection();
          7. // 获取通道
          8. Channel channel = connection.createChannel();
          9. // 声明队列
          10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
          11. // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
          12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
          13. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
          14. // 定义队列的消费者
          15. DefaultConsumer consumer = new DefaultConsumer(channel) {
          16. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用
          17. @Override
          18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          19. String msg = new String(body);
          20. System.out.println(" [消费者1] received: " + msg + "!");
          21. }
          22. };
          23. // 监听队列,自动返回完成
          24. channel.basicConsume(QUEUE_NAME, true, consumer);
          25. }
          26. }
          1. public class Recv2 {
          2. private final static String QUEUE_NAME = "direct_exchange_queue_2";
          3. private final static String EXCHANGE_NAME = "direct_exchange_test";
          4. public static void main(String[] args) throws IOException, TimeoutException {
          5. // 获取到链接
          6. Connection connection = ConnectionUtil.getConnection();
          7. // 获取通道
          8. Channel channel = connection.createChannel();
          9. // 声明队列
          10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
          11. // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
          12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
          13. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
          14. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
          15. // 定义队列的消费者
          16. DefaultConsumer consumer = new DefaultConsumer(channel) {
          17. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用
          18. @Override
          19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          20. String msg = new String(body);
          21. System.out.println(" [消费者2] received: " + msg + "!");
          22. }
          23. };
          24. // 监听队列,自动返回完成
          25. channel.basicConsume(QUEUE_NAME, true, consumer);
          26. }
          27. }
      • 订阅模型 – Topic:可以根据RoutingKey把消息路由到不同的队列,Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符

        • RoutingKey 一般都是有一个或多个单词组成,多个单词之间以“ . ”分割,例:item.insert。
        • 通配符规则:

          • #:匹配一个或多个词
          • *:匹配不多不少恰好一个词
          1. public class Send {
          2. private final static String EXCHANGE_NAME = "topic_durable_exchange_test";
          3. public static void main(String[] args) throws Exception {
          4. // 获取连接
          5. Connection connection = ConnectionUtil.getConnection();
          6. // 获取通道
          7. Channel channel = connection.createChannel();
          8. // 开启生产者确认
          9. // channel.confirmSelect();
          10. // 声明exchange,指定类型为topic, 并且设置durable为true,持久化
          11. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
          12. // 消息内容
          13. String message = "商品新增了,id = 1002";
          14. // 发布消息到Exchange,并且指定routing key,消息持久化
          15. channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
          16. System.out.println(" [商品服务] Sent '" + message + "'");
          17. // 等待rabbitmq的确认消息,true为确认收到,false为发出有误
          18. // channel.waitForConfirms();
          19. channel.close();
          20. connection.close();
          21. }
          22. }
          1. public class Recv {
          2. private final static String QUEUE_NAME = "topic_durable_exchange_queue_1";
          3. private final static String EXCHANGE_NAME = "topic_durable_exchange_test";
          4. public static void main(String[] args) throws IOException, TimeoutException {
          5. // 获取到链接
          6. Connection connection = ConnectionUtil.getConnection();
          7. // 获取通道
          8. Channel channel = connection.createChannel();
          9. // 声明队列, 第二个参数:true代表声明为持久化
          10. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
          11. // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
          12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
          13. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
          14. // 定义队列的消费者
          15. DefaultConsumer consumer = new DefaultConsumer(channel) {
          16. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用
          17. @Override
          18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          19. String msg = new String(body);
          20. System.out.println(" [消费者1] received: " + msg + "!");
          21. }
          22. };
          23. // 监听队列,自动返回完成
          24. channel.basicConsume(QUEUE_NAME, true, consumer);
          25. }
          26. }
          1. public class Recv2 {
          2. private final static String QUEUE_NAME = "topic_durable_exchange_queue_2";
          3. private final static String EXCHANGE_NAME = "topic_durable_exchange_test";
          4. public static void main(String[] args) throws IOException, TimeoutException {
          5. // 获取到链接
          6. Connection connection = ConnectionUtil.getConnection();
          7. // 获取通道
          8. Channel channel = connection.createChannel();
          9. // 声明队列
          10. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
          11. // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
          12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
          13. // 定义队列的消费者
          14. DefaultConsumer consumer = new DefaultConsumer(channel) {
          15. // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用
          16. @Override
          17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          18. String msg = new String(body);
          19. System.out.println(" [消费者2] received: " + msg + "!");
          20. }
          21. };
          22. // 监听队列,自动返回完成
          23. channel.basicConsume(QUEUE_NAME, true, consumer);
          24. }
          25. }
  • 当然啦,国内这么hot的rabbitMQ自然也是有集成到了springboot中滴,开心。

    • maven坐标依赖:

      1. <parent>
      2. <groupId>org.springframework.boot</groupId>
      3. <artifactId>spring-boot-starter-parent</artifactId>
      4. <version>2.0.4.RELEASE</version>
      5. </parent>
      6.  
      7. <dependencies>
      8. <dependency>
      9. <groupId>org.springframework.boot</groupId>
      10. <artifactId>spring-boot-starter-amqp</artifactId>
      11. </dependency>
      12. </dependencies>
    • application.yml:

      1. spring:
      2. rabbitmq:
      3. host: 127.0.0.1
      4. username: guest
      5. password: guest
      6. virtual-host: /demo 
      1. @Component
      2. public class Listener {
      3. @RabbitListener(bindings = @QueueBinding(
      4. value = @Queue(value = "spring.test.queue", durable = "true"),
      5. exchange = @Exchange(
      6. value = "spring.test.exchange",
      7. type = ExchangeTypes.TOPIC),
      8. key = {"#.#"}))
      9. public void listen(String msg) {
      10. System.out.println("接收到的消息: " + msg);
      11. }
      12. }
  • 持久化:要将消息持久化,前提是:队列、Exchange都持久化

    • 交换机持久化:channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);  // 参数三:设置durable为true。
    • 队列持久化:channel.queueDeclare(QUEUE_NAME, true, false, false, null);  // 参数二:设置为true,表示设置队列持久化。
    • 消息持久化:channel.basicPublish(EXCHANGE_NAME, “item.insert”, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  • 解决消息丢失?

    • ACK(消费者确认,由消费者向mq发送,防止消息丢失于消费者)
    • 持久化(防止rabbitmq把消息丢失)
    • 生产者确认机制publisher confirm(由mq向生产者发送,有些mq包含,有些不包含,比如:activeMQ不包含该机制,rabbitmq包含该机制)
    • 发送消息前,将消息持久化到数据库,并记录消息状态(可靠消息服务)
  • 思考问题(这个问题就留给你们思考啦?冲啊):如何保证消息发送的重复性,如何保证接口的幂等性(同一接口被重复执行,其结果一致)?【提示:加标识 消息的重发要谨慎】

版权声明:本文为sunxn77原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/sunxn77/p/12740630.html