rocketMQ
1、RocketMQ结构
nameserver从broker获取topic信息,producer通过topic确定将消息发送到不同broker;broker中有多个messagequeue,消息发送到broker后经过轮询算法、hash算法等将消息发送到不同的messagequeue上。messagequeue本身不存放消息,真正的消息存放在commitlog中,messagequeue只存放消息在commitlog中的对应位置信息,通过messagequeue找到对应存储在messagelog中数据;
不同topic、messagequeue消息都写到相同commitlog文件,即commitlog顺序消费;
rocketmq异常关闭重启后,如何寻找上次消费位置:rocketmq每次消费都有对应一个offset值,每消费一个offset++,重启后可以根据offset位置开始消费,有可能存在重复消费问题,可以通过幂等业务逻辑在消费端去重
producer将消息发往不同topic,一个topci对应多个queue,如果不指定,rocketmq采用轮询方式将同一个topic的消息发送到不同的queue上,如果需要发送有序消息,则需要将需要排序消息发送到相同的queue上,这样才能保证消息有序性
参考(https://www.jianshu.com/p/2838890f3284)
1.1 Rocketmq消息发送与订阅
生产者和消费者设置相同nameserver地址,生产者向从nameserver中获取对应topic的broker发送消息;消费者同样是这个nameserver中,消费者订阅topic,可以指定tag
生产者:
producer.setNamesrvAddr(nameserAddr);
producer.setInstanceName(instanceName);
消费者:
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe(topic, tag);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //如果非第一次启动,按照上次消费位置继续消费
生产者消费者通过指定相同nanmeserver、topic实现消息发送与消费
1.2 Rocketmq消息存储
rocketMQ消息存储在commitlog中,消息顺序写入commitlog中
consumequeue:消息消费队列,topic下每个queue都有对应的一个consumequeue,引入目的是为了提高消息消费性能,consumequeue是消息的逻辑队列,相当于字典,指定消息在commitlog上存储的地址。消费者是基于topic订阅消息的,如果遍历commitlog会效率比较低,而consumerqueue保存了topic对应消息在comitlog起始地址offset、消息大小size以及消息tag的hash值,提高消息消费效率
rocketmq采用混合存储结构:数据和索引分开存储,而且不同topic消息都会存储在同一个commitlog中
1.3 消息消费进度
消费时候消费端都会注册消息监听器:
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); doMyJob();//执行真正消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
只有返回success状态,rocketmq才会认为这批消息消费完成,此时offset++
如果消费端异常没有消费,重启后会获取offset值,重新消费,有可能会出现重复消费清醒
如果broker异常后,消费端会无限发送retry主题的消息,无限发送不成功的情况,不眠不休,无限循环。直到broker-1的master启动成功,或者ConsumerA关闭
消费端消费完成之后会回发回broker一个状态:ConsumeConcurrentlyStatus.xxxxx
①消费端消费成功,回发成功:消费完一批消息,消费进度offset+1,标示消息已被处理
②消费端代码异常,回发成功:消费端也会认为这条异常消费已经消费了,消费进度offset+1,消费端异常的消息已经转生在%RETRY%XX
topic里作为新消息等待消费了(重试消费),
③消费端宕机、brokerr宕机、停机异常:消费进度offset不变,下次启动时从该处在开始消费,可能会出现重复消费清醒,这种采用消费端幂等(唯一索引)处理
1.4 消息堆积问题
2、RocketMQ顺序消费(MessagequeueSelector)
RocketMQ中同一个队列不能被并行消费,但可以并行消费多个队列。基于此,Rocket可以保证将需要排序的内容放在同一个队列中便可以保证消费的顺序进行。举个例子,对于同一订单必须按照:订单创建——》订单付款——》订单发货方式进行,但对于不同订单之间可以并行消费的。
同一个队列只能被一个消费端消费
rocketMQ发送端源码:
/** * 生产者1 */ @Test public void send1() { try { DefaultMQProducer producer = new DefaultMQProducer("producerGroup1"); producer.setNamesrvAddr("192.168.229.5:9876;192.168.229.6:9876"); producer.setRetryTimesWhenSendFailed(3); producer.start(); String[] tags = new String[]{"创建订单", "支付", "发货", "收货", "五星好评"}; for (int i = 5; i < 25; i++) { int orderId = i / 5; Message msg = new Message("OrderTopic1", tags[i % tags.length], "uniqueId:" + i, ("order_" + orderId + " " + tags[i % tags.length]).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //此刻arg == orderId,可以保证是每个订单进入同一个队列 Integer id = (Integer) arg; //arg就是orderid int index = id % mqs.size(); //orderi取模消息队列个数 return mqs.get(index); //返回哪一个消息队列 } }, orderId); System.out.printf("%s%n", sendResult); } producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } } producer.send()代码: @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); } 代码解析: 1、producer.send有三个参数producer(msg,MessageQueueSelector,arg1) ①msg:mq发送的内容; ③arg1 一个obj对象,作用就是messageQueueSelector中方法select(List<MessageQueue> mqs, Message msg, Object arg))第三个参数 ②messageQueueSelector:消息队列选择器,这里是让用户决定消息msg发送到哪一个queue上。根据源码我们可以看出select方法返回的是哪一个具体的消息队列,如果同一个orderid则返回的是同一个消息队列,根据queue先进先出特性便可以保证发送消息的顺序性 2、这里是按照orderid排序的,如果业务开发中需要按照其他指标排序,只需修改send()方法第三个参数orderid为对应参数即可
View Code
上边代码中比较绕一个地方值的注意:msg中tag是按照i取模(i%5),而orderid是i整除tag.len(i/5),这样5~9是同一orderid,这样确定同一个队列,而tag分别是按照tag[0]、tag[1]…tag[4]依次放入同一个队列;
使用hash取模法,让同一个订单发送到同一个queue中,再使用同步发送,只有消息A发送成功,再发送消息B,这样,我们保证了发送有序.
rocketmq的topic内的队列机制,可以保证存储满足FIFO,剩下的只需要消费者顺序消费即可
rocketMQ消费端源码
/** * 订阅 */ @Test public void consumer1() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.229.5:9876;192.168.229.6:9876"); try { //设置Consumer从哪开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("OrderTopic1", "*"); // 实现了MessageListenerOrderly表示一个队列只会被一个线程取到, 第二个线程无法访问这个队列,MessageListenerOrderly默认单线程 // consumer.setConsumeThreadMin(3); // consumer.setConsumeThreadMax(6); consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { try { System.out.println("orderInfo: " + new String(msgs.get(0).getBody(), "utf-8")); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Consumer1 Started."); while (true) { } }
View Code
没有保证全局有序,但局部有序,每个订单的操作消息保证有序了,这种局部顺序满足高并发,符合预期.
3、rocketMQ事务问题(https://blog.csdn.net/weixin_40533111/article/details/84587967)
先发送PREPARED消息,返回其CommitLog Offset
执行本地逻辑,得到处理结果是Commit还是Rollback
将处理结果和CommitLog Offset发送到Broker
Broker先根据Offset从CommitLog中提起PREPARED消息,然后克隆此消息生成新的消息,消息Body(内容)和PREPARED的一致。先设置处理结果标识,然后根据处理结果,如果是Rollback,则清空body,否则不清空,最后存储进CommitLog
Broker在存储PREPARED消息时,不会将其PositionInfo存入ConsumeQueue,也就是正常情况下此消息不会被消费;但会为其生成IndexInfo存入IndexFile,也就是能通过key查询此消息。
当事务处理结果是Rollback时,克隆消息不会生成PositionInfo和IndexInfo,所以此消息不会被消费,不能被查询,事务流程就此结束;当事务处理结果是Commit时,克隆消息会生成PositionInfo和IndexInfo,也就是能被正常消费,也能正常查询。
事务消息的确认(Commit/Rollback)的执行方式是Oneway形式,也就是单向执行,没有结果返回,这种形式执行效率很高,但是有个问题就是不确定确认操作是否执行成功,可能因为网络问题或者Broker问题造成发送失败,消息回查就是解决这个问题,但现在没有了,所以需要我们自己设计解决。
事务确认操作失败,有两种补偿操作,一种是重新发送一次事务消息,另一种是去查询PREPARED消息。
3.1 producer发送消息事务(发送消息到broker)
producer采用TransactionMQProducer ;consumer采用普通consumer
【理论】
①producer发送Prepared消息
②发送Prepared成功前提下执行本地事务;如果Prepared失败则不执行本地事务,并且将本地事务状态(localtransactionstate)置为ROLLBACK_MESSAGE
③根据本地事务执行结果(localtransactionstate状态),决定向broker发送confirm还是cancel指令;如果是commit则broker决定将消息投递给consumer,如果是cancel则broker不将消息投递给comsumer
【源码】
具体代码如下:
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 标记消息是half消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { // 发送half消息,该方法是同步发送,事务消息也必须是同步发送 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { // 只有在half消息发送成功的时候才会执行事务 try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } // 执行本地事务 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { // 根据事务commit的情况来判断下一步操作 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; } public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; // 从broker返回的信息中获取half消息的offset if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); // 需要把transactionId和offset发送给broker,便于broker查找half消息 requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: // 表明本地址事务成功commit,告诉broker可以提交事务 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: // 说明事物需要回滚,有可能是half消息发送失败,也有可能是本地事务执行失败 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: // 如果状态是UNKNOW,broker还会反查producer,也就是接口:org.apache.rocketmq.example.transaction.TransactionCheckListenerImpl#checkLocalTransactionState的作用,但是目前rmq4.2.0并没有向producer查询,也就是源码中都没有调用这个接口 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; // 这个发送消息是onway的,也就是不会等待返回 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
View Code
对比发送普通消息VS 事务(half)消息
普通消息发送:
private DefaultMQProducer defaultMQProducer; //普通rocketmq生产者
SendResult sendResult=defaultMQProducer.send(msg);
half消息发送:
private TransactionMQProducer transactionMQproducer ;
SendResult sendResult = transactionMQproducer .sendMessageInTransaction(msg, transactionExecuter, “TopicTransaction”);
half消息和普通消息不一样,half消息执行send方法发送后会被消费,而half消息执行sendMessageInTransaction后并不会被consumer消费。原因是broker在将消息写入commitlog时候会判断消 息类型,普通消息发送他的消息类型是Transaction_not_type;half消息发送他的消息类型可能有三种类型transaction_prepared_type、transaction_rollback_type、transaction_commit_type,如果 是transaction_prepare_type和transaction_rollback_type类型时comsumeQueue的queueoffset(queueoffset对应消息在commitlog位置)不会增加,而comsumer在消费时会先读取 comsuerQueue中queueoffset值,根据queueoffset值去commitlog中读取对应消息。所以comsumer在拉取消息时不会拉取到prepared和rollback的消息。
相关代码如下:
/* *第一步:TRANSACTION_PREPARED_TYPEconsumerqueue中queueoffset和TRANSACTION_ROLLBACK_TYPE不递增 */ switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } // org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch /* *第二步:TRANSACTION_PREPARED_TYPEconsumerqueue中queueoffset和TRANSACTION_ROLLBACK_TYPE不读取 */ public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } }
View Code
以上两点保证了prepare消息也就是half消息不会被消费。
3.2 broker处理结束事务消息
producer端执行endtransaction()方法后,会将请求发往broker
broker端收到endtransaction()后,调用EndTransactionProcessor.java中EndTransactionProcessor.processRequest(ChannelHandlerContext ctx, RemotingCommand request)方法将事务消息写入commitlog中并生成consumequeue和index数据,供consumer消费
3.3 broker发起事务消息回查机制
3.1节中,如果endTransaction()方法执行失败,导致数据没有发送到broker,broker会有回查线程定时扫描每个存储事务状态表,如果是commit或者cancel状态消息则直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,producer会调用defaultMQProducerImpl.checkTransactionState()方法来处理broker定时回调请求,checkTransactionState会调用我们的事务设置的决断方法来决定是回滚事务还是继续执行,最后调用endTransactionOneway让broker来更新消息最终状态
流程图:
代码:
@Override 10: public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { 11: Runnable request = new Runnable() { 12: private final String brokerAddr = addr; 13: private final MessageExt message = msg; 14: private final CheckTransactionStateRequestHeader checkRequestHeader = header; 15: private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); 16: 17: @Override 18: public void run() { 19: TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); 20: if (transactionCheckListener != null) { 21: // 获取事务执行状态 22: LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; 23: Throwable exception = null; 24: try { 25: localTransactionState = transactionCheckListener.checkLocalTransactionState(message); 26: } catch (Throwable e) { 27: log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); 28: exception = e; 29: } 30: 31: // 处理事务结果,提交消息 COMMIT / ROLLBACK 32: this.processTransactionState(// 33: localTransactionState, // 34: group, // 35: exception); 36: } else { 37: log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); 38: } 39: } 40: 41: /** 42: * 处理事务结果,提交消息 COMMIT / ROLLBACK 43: * 44: * @param localTransactionState 【本地事务】状态 45: * @param producerGroup producerGroup 46: * @param exception 检查【本地事务】状态发生的异常 47: */ 48: private void processTransactionState(// 49: final LocalTransactionState localTransactionState, // 50: final String producerGroup, // 51: final Throwable exception) { 52: final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); 53: thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); 54: thisHeader.setProducerGroup(producerGroup); 55: thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); 56: thisHeader.setFromTransactionCheck(true); 57: 58: // 设置消息编号 59: String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); 60: if (uniqueKey == null) { 61: uniqueKey = message.getMsgId(); 62: } 63: thisHeader.setMsgId(uniqueKey); 64: 65: thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); 66: switch (localTransactionState) { 67: case COMMIT_MESSAGE: 68: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); 69: break; 70: case ROLLBACK_MESSAGE: 71: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); 72: log.warn("when broker check, client rollback this transaction, {}", thisHeader); 73: break; 74: case UNKNOW: 75: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); 76: log.warn("when broker check, client does not know this transaction state, {}", thisHeader); 77: break; 78: default: 79: break; 80: } 81: 82: String remark = null; 83: if (exception != null) { 84: remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); 85: } 86: 87: try { 88: // 提交消息 COMMIT / ROLLBACK 89: DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 90: 3000); 91: } catch (Exception e) { 92: log.error("endTransactionOneway exception", e); 93: } 94: } 95: }; 96: 97: // 提交执行 98: this.checkExecutor.submit(request); 99: } 100: 101: // :arrow_down::arrow_down::arrow_down:【DefaultMQProducerImpl.java】 102: /** 103: * 【事务消息回查】检查监听器 104: */ 105: public interface TransactionCheckListener { 106: 107: /** 108: * 获取(检查)【本地事务】状态 109: * 110: * @param msg 消息 111: * @return 事务状态 112: */ 113: LocalTransactionState checkLocalTransactionState(final MessageExt msg); 114: 115: }
View Code
3.4消息发送成功后消费端消费超时以及消费失败解决方案: