消息中间件——rocketmq环境配置
产生原因
RocketMQ概述
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式 高效的订阅者水平扩展能力 实时的消息订阅机制 亿级消息堆积能力
RocketMQ包含的组件
NameServer:单点,供Producer和Consumer获取Broker地址
Producer:产生并发送消息
Consumer:接受并消费消息
Broker:消息暂存,消息转发
-
Name Server
Name Server是RocketMQ的寻址服务。用于把Broker的路由信息做聚合。客户端依靠Name Server决定去获取对应topic的路由信息,从而决定对哪些Broker做连接。
Name Server是一个几乎无状态的结点,Name Server之间采取share-nothing的设计,互不通信。
对于一个Name Server集群列表,客户端连接Name Server的时候,只会选择随机连接一个结点,以做到负载均衡。
Name Server所有状态都从Broker上报而来,本身不存储任何状态,所有数据均在内存。
如果中途所有Name Server全都挂了,影响到路由信息的更新,不会影响和Broker的通信。 -
Broker
Broker是处理消息存储,转发等处理的服务器。
Broker以group分开,每个group只允许一个master,若干个slave。
只有master才能进行写入操作,slave不允许。
slave从master中同步数据。同步策略取决于master的配置,可以采用同步双写,异步复制两种。
客户端消费可以从master和slave消费。在默认情况下,消费者都从master消费,在master挂后,客户端由于从Name Server中感知到Broker挂机,就会从slave消费。
Broker向所有的NameServer结点建立长连接,注册Topic信息。
1.强调集群无单点,可扩展
2.任意一点高可用,水平可扩展
3.海量消息堆积能力,消息堆积后,写入低延迟。
4.支持上万个队列
5.消息失败重试机制
6.消息可查询
7.开源社区活跃
8.成熟度(经过双十一考验)
队列特点
1、异步,无需等待
2.解耦,方便系统的扩展
如果没有消息队列,每当一个新的业务接入,我们都要在主系统调用新接口、或者当我们取消某些业务,我们也得在主系统删除某些接口调用。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,接下来收到消息如何处理,是下游的事情,无疑极大地减少了开发和联调的工作量。
3、削峰,高并发流量
RocketMQ中的消息模型
- Producer Group 生产者组:代表某一类的生产者,比如我们有多个秒杀系统作为生产者,这多个合在一起就是一个 Producer Group 生产者组,它们一般生产相同的消息。
- Consumer Group 消费者组:代表某一类的消费者,比如我们有多个短信系统作为消费者,这多个合在一起就是一个 Consumer Group 消费者组,它们一般消费相同的消息。
- Topic 主题:代表一类消息,比如订单消息,物流消息等等。
你可以看到图中生产者组中的生产者会向主题发送消息,而 主题中存在多个队列,生产者每次生产消息之后是指定主题中的某个队列发送消息的。
每个主题中都有多个队列(这里还不涉及到 Broker),集群消费模式下,一个消费者集群多台机器共同消费一个 topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 Consumer1 和 Consumer2 分别对应着两个队列,而 Consuer3 是没有队列对应的,所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同 。
每个消费组在每个队列上维护一个消费位置 ,为什么呢?
因为我们刚刚画的仅仅是一个消费者组,我们知道在发布订阅模式中一般会涉及到多个消费者组,而每个消费者组在每个队列中的消费位置都是不同的。如果此时有多个消费者组,那么消息被一个消费者组消费完之后是不会删除的(因为其它消费者组也需要呀),它仅仅是为每个消费者组维护一个 消费位移(offset) ,每次消费者组消费完会返回一个成功的响应,然后队列再把维护的消费位移加一,这样就不会出现刚刚消费过的消息再一次被消费了。
为什么使用rocketmq
- 是否支持分布式
activemq支持集群(结合zk),但不支持分布式
rocketmq分布式消息中间件,支持集群(主备)、并发量高
- mq消息堆积,会不会发生宕机
消费者不会宕机,因为存在缓存消息机制
消息中间件可能会宕机
而rocket支持海量消息堆积,支持上万个队列
- 消息中间件集群策略
使用的不是主从策略,而是均摊测试,提高消息并发量
rocketmq原理
名称 | 功能 |
---|---|
nameServer | 存放生产者、消费者投递信息 |
Broker | 消息缓存 |
Producer | 生产者 |
Consumer | 消费者 |
-
NameServer:不知道你们有没有接触过 ZooKeeper 和 Spring Cloud 中的 Eureka ,它其实也是一个 注册中心 ,主要提供两个功能:Broker管理 和 路由信息管理 。说白了就是 Broker 会将自己的信息注册到 NameServer 中,此时 NameServer 就存放了很多 Broker 的信息(Broker的路由表),消费者和生产者就从 NameServer 中获取路由表然后照着路由表的信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)。
-
Producer:消息发布的角色,支持分布式集群方式部署。说白了就是生产者。
-
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。说白了就是消费者。
-
第一、我们的 Broker 做了集群并且还进行了主从部署 ,由于消息分布在各个 Broker 上,一旦某个 Broker 宕机,则该Broker 上的消息读写都会受到影响。所以 Rocketmq 提供了 master/slave 的结构,salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,则 slave 提供消费服务,但是不能写入消息 (后面我还会提到哦)。
-
第二、为了保证 HA ,我们的 NameServer 也做了集群部署,但是请注意它是 去中心化 的。也就意味着它没有主节点,你可以很明显地看出 NameServer 的所有节点是没有进行 Info Replicate 的,在 RocketMQ 中是通过 单个Broker和所有NameServer保持长连接 ,并且在每隔30秒 Broker 会向所有 Nameserver 发送心跳,心跳包含了自身的 Topic 配置信息,这个步骤就对应这上面的 Routing Info 。
-
第三、在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过 轮询 的方法去向每个队列中生产数据以达到 负载均衡 的效果。
-
第四、消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 请求来获取消息数据。Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给 同一个消费组中的所有消费者 ,集群模式下消息只会发送给一个消费者。
rocketmq集群搭建
1.7jdk以上,jdk必须64位
配置jvm参数
1、安装maven
镜像地址:
https://mirrors.cnnic.cn/apache/maven
wget https://mirrors.cnnic.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
解压:
tar -zxvf apache-maven-3.6.3-bin.tar.gz
在文件末尾追加环境变量:
vi /etc/profile:
MAVEN_HOME=/usr/local/apache-maven-3.6.3
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin
让配置文件立刻生效:
source /etc/profile
验证:
mvn -v
2、安装rocketmq
wget https://github.com/apache/rocketmq/archive/rocketmq-all-4.2.0.tar.gz
tar -zvxf rocketmq-all-4.2.0.tar.gz
进入解压目录:
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq/
pwd "path"
vim /etc/profile
export rocketmq="path"
export PATH=$PATH:$rocketmq/bin
source /etc/profile
mkdir /usr/local/log/rocketmqlogs
启动nameServer:
nohup mqnamesrv >/usr/local/log/rocketmqlogs/namesrv.log 2>&1 &
tail -f /user/local/log/rocketmqlogs/namesrv.log
启动broker:
nohup mqbroker -n localhost:9876 >/usr/local/log/rocketmqlogs/broker.log 2>&1 &
tail -f /user/local/log/rocketmqlogs/broker.log
修改默认RocketMQ内存:
bin目录下的runserver.sh和runbroker.sh文件
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn128m"
关闭:
mqshutdown namesrv
mqshutdown broker
可视化管理控制台RocketMQ Console
2、安装rocketmq(可快速安装)
官网:
http://rocketmq.apache.org/dowloading/releases/
wget http://mirrors.hust.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip
unzip rocketmq-all-4.7.0-bin-release.zip
mv rocketmq-all-4.7.0-bin-release /usr/local/rocketmq
cd /usr/local/rocketmq/bin
修改内存参数:
vi runserver.sh、runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn128m"
启动nameServer:
cd /usr/local/rocketmq/bin
nohup sh mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &
tail -f /usr/local/rocketmq/logs/mqnamesrv.log
启动broker:
cd /usr/local/rocketmq/bin
nohup sh mqbroker -n localhost:9876 >/usr/local/rocketmq/logs/mqbroker.log 2>&1 &
tail -f /usr/local/rocketmq/logs/mqbroker.log
安装可视化插件
1、下载包
下载:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0
2、修改配置
application.properties:
注释掉plugin,否则会报错:
3、编译打包
mvn clean package -Dmaven.test.skip=true
生成target目录,启动:
上传到linux服务器:
java -jar rocketmq-console-ng-1.0.0.jar
如果配置文件没有填写Name Server
java -jar rocketmq-console-ng-1.0.0.jar –rocketmq.config.namesrvAddr=‘10.0.74.198:9876;10.0.74.199:9876’
双主模式master搭建
1、配置hosts
vi /etc/hosts
192.168.70.11 rocketmq-nameserver1
192.168.70.11 rocketmq-master1
192.168.70.12 rocketmq-nameserver2
192.168.70.12 rocketmq-master2
重启host服务(centos8):
systemctl start NetworkManager
systemctl restart systemd-hostnamed
测试:
ping rocketmq-nameserver1
2、创建存储路径
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
3、RocketMQ配置文件
- vim conf/2m-noslave/broker-a.properties
- vim conf/2m-noslave/broker-b.properties
broker-aproperties配置(b类似):
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
4、修改日志配置文件:
mkdir -p /usr/local/rocketmq/logs
cd /usr/local/rocketmq/conf && sed -i ‘s#${user.home}#/usr/local/rocketmq#g’ *.xml
5、启动服务
关闭防火墙:
systemctl stop firewalld.service
启动NameServer:
cd /usr/local/rocketmq/bin
nohup sh mqnamesrv &
启动broker-a\broker-b:
cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
netstat -ntlp
jps
tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
注意事项
服务器a:
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/
broker-a.properties >/dev/null 2>&1 &
服务器b:
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/
broker-b.properties >/dev/null 2>&1 &
参考:
Linux下RocketMQ下载安装教程
RcoketMq集群安装和RocketMQ
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000); // 每秒发送一次MQ
Message msg = new Message("link-topic", // topic 主题名称
"TagA", // tag 临时值
("link-"+i).getBytes()// body 内容
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("link-topic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
RocketMQ重试机制
MQ 消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。
MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("link-topic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getMsgId() + "---" + new String(msg.getBody()));
}
try {
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
// 需要重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 不需要重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
注意:每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。
解决办法:使用自定义全局ID判断幂等,例如流水ID、订单号
使用msg.setKeys 进行区分
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = 0; i < 1; i++) {
Thread.sleep(1000); // 每秒发送一次MQ
Message msg = new Message("link-topic", // topic 主题名称
"TagA", // tag 临时值
("link-6" + i).getBytes()// body 内容
);
msg.setKeys(System.currentTimeMillis() + "");
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
消费者:
static private Map<String, String> logMap = new HashMap<>();
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("link-topic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
String key = null;
String msgId = null;
try {
for (MessageExt msg : msgs) {
key = msg.getKeys();
if (logMap.containsKey(key)) {
// 无需继续重试。
System.out.println("key:"+key+",无需重试...");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
msgId = msg.getMsgId()