产生原因

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

RocketMQ概述

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式 高效的订阅者水平扩展能力 实时的消息订阅机制 亿级消息堆积能力

RocketMQ包含的组件

NameServer:单点,供Producer和Consumer获取Broker地址
Producer:产生并发送消息
Consumer:接受并消费消息
Broker:消息暂存,消息转发

在这里插入图片描述

  • Name Server
    Name Server是RocketMQ的寻址服务。用于把Broker的路由信息做聚合。客户端依靠Name Server决定去获取对应topic的路由信息,从而决定对哪些Broker做连接。
    Name Server是一个几乎无状态的结点,Name Server之间采取share-nothing的设计,互不通信。
    对于一个Name Server集群列表,客户端连接Name Server的时候,只会选择随机连接一个结点,以做到负载均衡。
    Name Server所有状态都从Broker上报而来,本身不存储任何状态,所有数据均在内存。
    如果中途所有Name Server全都挂了,影响到路由信息的更新,不会影响和Broker的通信。

  • Broker
    Broker是处理消息存储,转发等处理的服务器。
    Broker以group分开,每个group只允许一个master,若干个slave。
    只有master才能进行写入操作,slave不允许。
    slave从master中同步数据。同步策略取决于master的配置,可以采用同步双写,异步复制两种。
    客户端消费可以从master和slave消费。在默认情况下,消费者都从master消费,在master挂后,客户端由于从Name Server中感知到Broker挂机,就会从slave消费。
    Broker向所有的NameServer结点建立长连接,注册Topic信息。

1.强调集群无单点,可扩展
2.任意一点高可用,水平可扩展
3.海量消息堆积能力,消息堆积后,写入低延迟。
4.支持上万个队列
5.消息失败重试机制
6.消息可查询
7.开源社区活跃
8.成熟度(经过双十一考验)

队列特点

1、异步,无需等待
在这里插入图片描述
2.解耦,方便系统的扩展

如果没有消息队列,每当一个新的业务接入,我们都要在主系统调用新接口、或者当我们取消某些业务,我们也得在主系统删除某些接口调用。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,接下来收到消息如何处理,是下游的事情,无疑极大地减少了开发和联调的工作量。

3、削峰,高并发流量
在这里插入图片描述

RocketMQ中的消息模型

在这里插入图片描述

  • Producer Group 生产者组:代表某一类的生产者,比如我们有多个秒杀系统作为生产者,这多个合在一起就是一个 Producer Group 生产者组,它们一般生产相同的消息。
  • Consumer Group 消费者组:代表某一类的消费者,比如我们有多个短信系统作为消费者,这多个合在一起就是一个 Consumer Group 消费者组,它们一般消费相同的消息。
  • Topic 主题:代表一类消息,比如订单消息,物流消息等等。

你可以看到图中生产者组中的生产者会向主题发送消息,而 主题中存在多个队列,生产者每次生产消息之后是指定主题中的某个队列发送消息的。
每个主题中都有多个队列(这里还不涉及到 Broker),集群消费模式下,一个消费者集群多台机器共同消费一个 topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 Consumer1 和 Consumer2 分别对应着两个队列,而 Consuer3 是没有队列对应的,所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同 。

每个消费组在每个队列上维护一个消费位置 ,为什么呢?

因为我们刚刚画的仅仅是一个消费者组,我们知道在发布订阅模式中一般会涉及到多个消费者组,而每个消费者组在每个队列中的消费位置都是不同的。如果此时有多个消费者组,那么消息被一个消费者组消费完之后是不会删除的(因为其它消费者组也需要呀),它仅仅是为每个消费者组维护一个 消费位移(offset) ,每次消费者组消费完会返回一个成功的响应,然后队列再把维护的消费位移加一,这样就不会出现刚刚消费过的消息再一次被消费了。

为什么使用rocketmq

  • 是否支持分布式

activemq支持集群(结合zk),但不支持分布式
rocketmq分布式消息中间件,支持集群(主备)、并发量高

  • mq消息堆积,会不会发生宕机

消费者不会宕机,因为存在缓存消息机制
消息中间件可能会宕机
而rocket支持海量消息堆积,支持上万个队列

  • 消息中间件集群策略

使用的不是主从策略,而是均摊测试,提高消息并发量

rocketmq原理

名称 功能
nameServer 存放生产者、消费者投递信息
Broker 消息缓存
Producer 生产者
Consumer 消费者

在这里插入图片描述

  • NameServer:不知道你们有没有接触过 ZooKeeper 和 Spring Cloud 中的 Eureka ,它其实也是一个 注册中心 ,主要提供两个功能:Broker管理 和 路由信息管理 。说白了就是 Broker 会将自己的信息注册到 NameServer 中,此时 NameServer 就存放了很多 Broker 的信息(Broker的路由表),消费者和生产者就从 NameServer 中获取路由表然后照着路由表的信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)。

  • Producer:消息发布的角色,支持分布式集群方式部署。说白了就是生产者。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。说白了就是消费者。

在这里插入图片描述

在这里插入图片描述

  • 第一、我们的 Broker 做了集群并且还进行了主从部署 ,由于消息分布在各个 Broker 上,一旦某个 Broker 宕机,则该Broker 上的消息读写都会受到影响。所以 Rocketmq 提供了 master/slave 的结构,salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,则 slave 提供消费服务,但是不能写入消息 (后面我还会提到哦)。

  • 第二、为了保证 HA ,我们的 NameServer 也做了集群部署,但是请注意它是 去中心化 的。也就意味着它没有主节点,你可以很明显地看出 NameServer 的所有节点是没有进行 Info Replicate 的,在 RocketMQ 中是通过 单个Broker和所有NameServer保持长连接 ,并且在每隔30秒 Broker 会向所有 Nameserver 发送心跳,心跳包含了自身的 Topic 配置信息,这个步骤就对应这上面的 Routing Info 。

  • 第三、在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过 轮询 的方法去向每个队列中生产数据以达到 负载均衡 的效果。

  • 第四、消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 请求来获取消息数据。Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给 同一个消费组中的所有消费者 ,集群模式下消息只会发送给一个消费者。

rocketmq集群搭建

1.7jdk以上,jdk必须64位
配置jvm参数

1、安装maven

镜像地址:
https://mirrors.cnnic.cn/apache/maven

wget https://mirrors.cnnic.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

解压:
tar -zxvf apache-maven-3.6.3-bin.tar.gz

在文件末尾追加环境变量:
vi /etc/profile:
MAVEN_HOME=/usr/local/apache-maven-3.6.3
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin

让配置文件立刻生效:
source /etc/profile

验证:
mvn -v

2、安装rocketmq

wget https://github.com/apache/rocketmq/archive/rocketmq-all-4.2.0.tar.gz

tar -zvxf rocketmq-all-4.2.0.tar.gz

进入解压目录:
mvn -Prelease-all -DskipTests clean install -U


cd distribution/target/apache-rocketmq/
pwd "path"

vim /etc/profile
export rocketmq="path"
export PATH=$PATH:$rocketmq/bin

source /etc/profile


mkdir /usr/local/log/rocketmqlogs


启动nameServer:
nohup mqnamesrv >/usr/local/log/rocketmqlogs/namesrv.log 2>&1 &  
tail -f /user/local/log/rocketmqlogs/namesrv.log

启动broker:
nohup mqbroker -n localhost:9876 >/usr/local/log/rocketmqlogs/broker.log 2>&1 &   
tail -f /user/local/log/rocketmqlogs/broker.log


修改默认RocketMQ内存:
bin目录下的runserver.sh和runbroker.sh文件
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn128m"

关闭:
mqshutdown namesrv
mqshutdown broker

可视化管理控制台RocketMQ Console

2、安装rocketmq(可快速安装)

官网:
http://rocketmq.apache.org/dowloading/releases/
wget http://mirrors.hust.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip

unzip rocketmq-all-4.7.0-bin-release.zip

mv rocketmq-all-4.7.0-bin-release /usr/local/rocketmq 

cd /usr/local/rocketmq/bin

修改内存参数:
vi runserver.sh、runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn128m"


启动nameServer:
cd /usr/local/rocketmq/bin
nohup sh mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &
tail -f /usr/local/rocketmq/logs/mqnamesrv.log

启动broker:
cd /usr/local/rocketmq/bin
nohup sh mqbroker -n localhost:9876 >/usr/local/rocketmq/logs/mqbroker.log 2>&1 & 
tail -f /usr/local/rocketmq/logs/mqbroker.log

安装可视化插件

1、下载包

下载:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0

2、修改配置

application.properties:
在这里插入图片描述
注释掉plugin,否则会报错:
在这里插入图片描述

3、编译打包

mvn clean package -Dmaven.test.skip=true
生成target目录,启动:
上传到linux服务器:
java -jar rocketmq-console-ng-1.0.0.jar
如果配置文件没有填写Name Server
java -jar rocketmq-console-ng-1.0.0.jar –rocketmq.config.namesrvAddr=‘10.0.74.198:9876;10.0.74.199:9876’

双主模式master搭建

1、配置hosts

vi /etc/hosts
192.168.70.11 rocketmq-nameserver1
192.168.70.11 rocketmq-master1
192.168.70.12 rocketmq-nameserver2
192.168.70.12 rocketmq-master2

重启host服务(centos8):
systemctl start NetworkManager
systemctl restart systemd-hostnamed
测试:
ping rocketmq-nameserver1

2、创建存储路径

mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index

3、RocketMQ配置文件

  • vim conf/2m-noslave/broker-a.properties
  • vim conf/2m-noslave/broker-b.properties
broker-aproperties配置(b类似):
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

4、修改日志配置文件:

mkdir -p /usr/local/rocketmq/logs
cd /usr/local/rocketmq/conf && sed -i ‘s#${user.home}#/usr/local/rocketmq#g’ *.xml

5、启动服务

关闭防火墙:
systemctl stop firewalld.service

启动NameServer:
cd /usr/local/rocketmq/bin
nohup sh mqnamesrv &

启动broker-a\broker-b:
cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
netstat -ntlp
jps
tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

注意事项

服务器a:
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/
broker-a.properties >/dev/null 2>&1 &

服务器b:
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/
broker-b.properties >/dev/null 2>&1 &

在这里插入图片描述
参考:
Linux下RocketMQ下载安装教程
RcoketMq集群安装和RocketMQ


public class Producer {

	public static void main(String[] args) throws MQClientException {
		DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
		producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
		producer.setInstanceName("producer");
		producer.start();
		try {
			for (int i = 0; i < 10; i++) {
				Thread.sleep(1000); // 每秒发送一次MQ
				Message msg = new Message("link-topic", // topic 主题名称
						"TagA", // tag 临时值
						("link-"+i).getBytes()// body 内容
				);
				SendResult sendResult = producer.send(msg);
				System.out.println(sendResult.toString());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		producer.shutdown();
	}

}

消费者

public class Consumer {
	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

		consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
		consumer.setInstanceName("consumer");
		consumer.subscribe("link-topic", "TagA");

		consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				for (MessageExt msg : msgs) {
					System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
				}
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		consumer.start();
		System.out.println("Consumer Started.");
	}
}

RocketMQ重试机制

MQ 消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。
MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

public class Consumer {
	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
	consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
		consumer.setInstanceName("consumer");
		consumer.subscribe("link-topic", "TagA");

		consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				for (MessageExt msg : msgs) {
					System.out.println(msg.getMsgId() + "---" + new String(msg.getBody()));
				}
				try {
					int i = 1 / 0;
				} catch (Exception e) {
					e.printStackTrace();
                                // 需要重试
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;

				}
                         // 不需要重试
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		consumer.start();
		System.out.println("Consumer Started.");
	}
}

注意:每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。
解决办法:使用自定义全局ID判断幂等,例如流水ID、订单号
使用msg.setKeys 进行区分

public class Producer {

	public static void main(String[] args) throws MQClientException {
		DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
		producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
		producer.setInstanceName("producer");
		producer.start();
		try {
			for (int i = 0; i < 1; i++) {
				Thread.sleep(1000); // 每秒发送一次MQ
				Message msg = new Message("link-topic", // topic 主题名称
						"TagA", // tag 临时值
						("link-6" + i).getBytes()// body 内容
				);
				msg.setKeys(System.currentTimeMillis() + "");
				SendResult sendResult = producer.send(msg);
				System.out.println(sendResult.toString());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		producer.shutdown();
	}

}
消费者:
	static private Map<String, String> logMap = new HashMap<>();

	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

		consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
		consumer.setInstanceName("consumer");
		consumer.subscribe("link-topic", "TagA");

		consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				String key = null;
				String msgId = null;
				try {
					for (MessageExt msg : msgs) {
						key = msg.getKeys();
						if (logMap.containsKey(key)) {
							// 无需继续重试。
							System.out.println("key:"+key+",无需重试...");
							return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
						}
						msgId = msg.getMsgId();
						System.out
版权声明:本文为cndeveloper原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/cndeveloper/p/14347617.html