MQ,是一种跨进程的通信机制,用于上下游传递消息。在传统的互联网架构中通常使用MQ来对上下游来做解耦合。

举例:当A系统对B系统进行消息通讯,如A系统发布一条系统公告,B系统可以订阅该频道进行系统公告同步,整个过程中A系统并不关系B系统会不会同步,由订阅该频道的系统自行处理。

官方说明:

随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。

看到这里可以很清楚的知道RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。

具有以下特性:

  • 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  • 能够保证严格的消息顺序,在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  • 提供丰富的消息拉取模式,支持拉(pull)和推(push)两种消息模式
  • 单一队列百万消息的堆积能力,亿级消息堆积能力
  • 支持多种消息协议,如 JMS、MQTT 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义

下载地址:https://rocketmq.apache.org/dowloading/releases/

从官方下载二进制或者源码来进行使用。源码编译需要Maven3.2x,JDK8

在根目录进行打包:

  1. mvn -Prelease-all -DskipTests clean packager -U

distribution/target/apache-rocketmq文件夹中会存在一个文件夹版,zip,tar三个可运行的完整程序。

使用rocketmq-4.6.0.zip:

  1. 启动名称服务 mqnamesrv.cmd
  2. 启动数据中心 mqbroker.cmd -n localhost:9876

SpringBoot 入门:https://www.cnblogs.com/SimpleWu/p/10027237.html
SpringBoot 常用start:https://www.cnblogs.com/SimpleWu/p/9798146.html
当前环境版本为:

  • SpringBoot 2.0.6.RELEASE
  • SpringCloud Finchley.RELEASE
  • SpringCldod Alibaba 0.2.1.RELEASE
  • RocketMQ 4.3.0
    在项目工程中导入:
  1. <!-- MQ Begin -->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>${rocketmq.version}</version>
  6. </dependency>
  7. <!-- MQ End -->

由于我们这边已经有工程了所以就不在进行创建这种过程了。主要是看看如何使用RocketMQ。
创建RocketMQProperties配置属性类,类中内容如下:

  1. @ConfigurationProperties(prefix = "rocketmq")
  2. public class RocketMQProperties {
  3. private boolean isEnable = false;
  4. private String namesrvAddr = "localhost:9876";
  5. private String groupName = "default";
  6. private int producerMaxMessageSize = 1024;
  7. private int producerSendMsgTimeout = 2000;
  8. private int producerRetryTimesWhenSendFailed = 2;
  9. private int consumerConsumeThreadMin = 5;
  10. private int consumerConsumeThreadMax = 30;
  11. private int consumerConsumeMessageBatchMaxSize = 1;
  12. //省略get set
  13. }

现在我们所有子系统中的生产者,消费者对应:
isEnable 是否开启mq
namesrvAddr 集群地址
groupName 分组名称
设置为统一已方便系统对接,如有其它需求在进行扩展,类中我们已经给了默认值也可以在配置文件或配置中心中获取配置,配置如下:

  1. #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
  2. rocketmq.groupName=please_rename_unique_group_name
  3. #是否开启自动配置
  4. rocketmq.isEnable=true
  5. #mq的nameserver地址
  6. rocketmq.namesrvAddr=127.0.0.1:9876
  7. #消息最大长度 默认1024*4(4M)
  8. rocketmq.producer.maxMessageSize=4096
  9. #发送消息超时时间,默认3000
  10. rocketmq.producer.sendMsgTimeout=3000
  11. #发送消息失败重试次数,默认2
  12. rocketmq.producer.retryTimesWhenSendFailed=2
  13. #消费者线程数量
  14. rocketmq.consumer.consumeThreadMin=5
  15. rocketmq.consumer.consumeThreadMax=32
  16. #设置一次消费消息的条数,默认为1条
  17. rocketmq.consumer.consumeMessageBatchMaxSize=1

创建消费者接口 RocketConsumer.java 该接口用户约束消费者需要的核心步骤:

  1. /**
  2. * 消费者接口
  3. *
  4. * @author SimpleWu
  5. *
  6. */
  7. public interface RocketConsumer {
  8. /**
  9. * 初始化消费者
  10. */
  11. public abstract void init();
  12. /**
  13. * 注册监听
  14. *
  15. * @param messageListener
  16. */
  17. public void registerMessageListener(MessageListener messageListener);
  18. }

创建抽象消费者 AbstractRocketConsumer.java:

  1. /**
  2. * 消费者基本信息
  3. *
  4. * @author SimpelWu
  5. */
  6. public abstract class AbstractRocketConsumer implements RocketConsumer {
  7. protected String topics;
  8. protected String tags;
  9. protected MessageListener messageListener;
  10. protected String consumerTitel;
  11. protected MQPushConsumer mqPushConsumer;
  12. /**
  13. * 必要的信息
  14. *
  15. * @param topics
  16. * @param tags
  17. * @param consumerTitel
  18. */
  19. public void necessary(String topics, String tags, String consumerTitel) {
  20. this.topics = topics;
  21. this.tags = tags;
  22. this.consumerTitel = consumerTitel;
  23. }
  24. public abstract void init();
  25. @Override
  26. public void registerMessageListener(MessageListener messageListener) {
  27. this.messageListener = messageListener;
  28. }
  29. }

在类中我们必须指定这个topics,tags与消息监听逻辑
public abstract void init();该方法是用于初始化消费者,由子类实现。
接下来我们编写自动配置类RocketMQConfiguation.java,该类用户初始化一个默认的生产者连接,以及加载所有的消费者。
@EnableConfigurationProperties({ RocketMQProperties.class }) 使用该配置文件
@Configuration 标注为配置类
@ConditionalOnProperty(prefix = “rocketmq”, value = “isEnable”, havingValue = “true”) 只有当配置中指定rocketmq.isEnable = true的时候才会生效
核心内容如下:

  1. /**
  2. * mq配置
  3. *
  4. * @author SimpleWu
  5. */
  6. @Configuration
  7. @EnableConfigurationProperties({ RocketMQProperties.class })
  8. @ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")
  9. public class RocketMQConfiguation {
  10. private RocketMQProperties properties;
  11. private ApplicationContext applicationContext;
  12. private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class);
  13. public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) {
  14. this.properties = properties;
  15. this.applicationContext = applicationContext;
  16. }
  17. /**
  18. * 注入一个默认的消费者
  19. * @return
  20. * @throws MQClientException
  21. */
  22. @Bean
  23. public DefaultMQProducer getRocketMQProducer() throws MQClientException {
  24. if (StringUtils.isEmpty(properties.getGroupName())) {
  25. throw new MQClientException(-1, "groupName is blank");
  26. }
  27. if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
  28. throw new MQClientException(-1, "nameServerAddr is blank");
  29. }
  30. DefaultMQProducer producer;
  31. producer = new DefaultMQProducer(properties.getGroupName());
  32. producer.setNamesrvAddr(properties.getNamesrvAddr());
  33. // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
  34. // 如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
  35. // producer.setInstanceName(instanceName);
  36. producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
  37. producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
  38. // 如果发送消息失败,设置重试次数,默认为2次
  39. producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());
  40. try {
  41. producer.start();
  42. log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),
  43. properties.getNamesrvAddr());
  44. } catch (MQClientException e) {
  45. log.error(String.format("producer is error {}", e.getMessage(), e));
  46. throw e;
  47. }
  48. return producer;
  49. }
  50. /**
  51. * SpringBoot启动时加载所有消费者
  52. */
  53. @PostConstruct
  54. public void initConsumer() {
  55. Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
  56. if (consumers == null || consumers.size() == 0) {
  57. log.info("init rocket consumer 0");
  58. }
  59. Iterator<String> beans = consumers.keySet().iterator();
  60. while (beans.hasNext()) {
  61. String beanName = (String) beans.next();
  62. AbstractRocketConsumer consumer = consumers.get(beanName);
  63. consumer.init();
  64. createConsumer(consumer);
  65. log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,
  66. consumer.topics);
  67. }
  68. }
  69. /**
  70. * 通过消费者信心创建消费者
  71. *
  72. * @param consumerPojo
  73. */
  74. public void createConsumer(AbstractRocketConsumer arc) {
  75. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName());
  76. consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
  77. consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
  78. consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
  79. consumer.registerMessageListener(arc.messageListenerConcurrently);
  80. /**
  81. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
  82. */
  83. // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  84. /**
  85. * 设置消费模型,集群还是广播,默认为集群
  86. */
  87. // consumer.setMessageModel(MessageModel.CLUSTERING);
  88. /**
  89. * 设置一次消费消息的条数,默认为1条
  90. */
  91. consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
  92. try {
  93. consumer.subscribe(arc.topics, arc.tags);
  94. consumer.start();
  95. arc.mqPushConsumer=consumer;
  96. } catch (MQClientException e) {
  97. log.error("info consumer title {}", arc.consumerTitel, e);
  98. }
  99. }
  100. }

然后在src/main/resources文件夹中创建目录与文件META-INF/spring.factories里面添加自动配置类即可开启启动配置,我们只需要导入依赖即可:

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2. com.xcloud.config.rocketmq.RocketMQConfiguation

接下来在服务中导入依赖,然后通过我们的抽象类获取所有必要信息对消费者进行创建,该步骤会在所有消费者初始化完成后进行,且只会管理是Spring Bean的消费者。
下面我们看看如何创建一个消费者,创建消费者的步骤非常简单,只需要继承AbstractRocketConsumer然后再加上Spring的@Component就能够完成消费者的创建,我们可以在类中自定义消费的主题与标签。
在项目可以根据需求当消费者创建失败的时候是否继续启动工程。
创建一个默认的消费者 DefaultConsumerMQ.java

  1. @Component
  2. public class DefaultConsumerMQ extends AbstractRocketConsumer {
  3. /**
  4. * 初始化消费者
  5. */
  6. @Override
  7. public void init() {
  8. // 设置主题,标签与消费者标题
  9. super.necessary("TopicTest", "*", "这是标题");
  10. //消费者具体执行逻辑
  11. registerMessageListener(new MessageListenerConcurrently() {
  12. @Override
  13. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  14. msgs.forEach(msg -> {
  15. System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));
  16. });
  17. // 标记该消息已经被成功消费
  18. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  19. }
  20. });
  21. }
  22. }

super.necessary(“TopicTest”, “*”, “这是标题”); 是必须要设置的,代表该消费者监听TopicTest主题下所有tags,标题那个字段是我自己定义的,所以对于该配置来说没什么意义。
我们可以在这里注入Spring的Bean来进行任意逻辑处理。
创建一个消息发送类进行测试

  1. @Override
  2. public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
  3. Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET));
  4. // 发送消息到一个Broker
  5. SendResult sendResult = defaultMQProducer.send(msg);
  6. // 通过sendResult返回消息是否成功送达
  7. System.out.printf("%s%n", sendResult);
  8. return null;
  9. }

我们来通过Http请求测试:

  1. http://localhost:10001/demo/base/mq/hello consumer message boyd hello
  2. http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿

好了到这里简单的start算是设计完成了,后面还有一些:顺序消息生产,顺序消费消息,异步消息生产等一系列功能,官人可参照官方去自行处理。

  • ActiveMQ 没经过大规模吞吐量场景的验证,社区不高不活跃。
  • RabbitMQ 集群动态扩展麻烦,且与当前程序语言不至于难以定制化。
  • kafka 支持主要的MQ功能,功能无法达到程序需求的要求,所以不使用,且与当前程序语言不至于难以定制化。
  • rocketMQ MQ功能较为完善,还是分布式的,扩展性好;支持复杂MQ业务场景。(业务复杂可做首选)

版权声明:本文为SimpleWu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/SimpleWu/p/12112351.html