RocketMQ生产消费模型选择
代码参考:
生产者,根据某个标识将消息放到同一个队列中
public class Producer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("10.130.41.36:9876"); producer.setInstanceName("Producer"); producer.setVipChannelEnabled(false); producer.start(); String[] tags = {"tagA","tagB"}; for (int i = 1; i <= 10; i++) { try { Message msg = new Message("TopicTest",tags[i%tags.length],"key1"+i,("订单一号" + i).getBytes()); SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),1); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } } for (int i = 1; i <= 10; i++) { try { Message msg = new Message("TopicTest",tags[i%tags.length],"key2"+i,("订单二号" + i).getBytes()); SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),2); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } } for (int i = 1; i <= 10; i++) { try { Message msg = new Message("TopicTest",tags[i%tags.length],"key3"+i,("订单三号" + i).getBytes()); SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),3); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
Topic队列中的内容:
消费者:
一.顺序消费
使用MessageListenerOrderly,顺序消费同一个队列中的数据,只有第一个数据消费成功了才会消费第二个数据。
模拟在消费某个数据时出现了阻塞状态。
public class ConsumerOrderly { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("10.130.41.36:9876"); consumer.setInstanceName("Consumer1"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的 context.setAutoCommit(true); try { for (MessageExt msg:msgs) { String msgKey = msg.getKeys(); if(msgKey.equals("key13") || msgKey.equals("key22")){ Thread.sleep(1000); } System.out.println(" 消费者1 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } //消费成功 return ConsumeOrderlyStatus.SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("C1 Started."); } }
测试结果如下:
当”订单一号3″没有消费时,他所在队列中后面的数据是不能被消费的。”订单二号2″也是同样的情况。
二. 并发消费
使用MessageListenerConcurrently,并发消费同一个队列中的数据,不能保证消费的顺序。
模拟在消费某个数据时出现了阻塞状态。
public class ConsumerConcurrently { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("10.130.41.36:9876"); consumer.setInstanceName("Consumer1"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg:msgs) { String msgKey = msg.getKeys(); if(msgKey.equals("key13") || msgKey.equals("key22")){ Thread.sleep(1000); } System.out.println(" 消费者1 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("C1 Started."); } }
测试结果如下
当消费”订单二号3″阻塞时,会将后面的数据交给其他线程消费,所以”订单一号4″ 在 “订单一号3″之前消费了。
三.集群消费
不同消费者设置成相同的组名,在MessageModel.CLUSTERING模式下,不同消费者会消费不同的队列,同一个消费者中保证顺序
消费者1
public class ConsumerOrderly_1 {
public static void main(String[] args) throws InterruptedException,
MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("10.130.41.36:9876");
consumer.setInstanceName("Consumer1");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
context.setAutoCommit(true);
try {
for (MessageExt msg:msgs) {
String msgKey = msg.getKeys();
if(msgKey.equals("key13")){
Thread.sleep(1000);
}
System.out.println(" 消费者1 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
//如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
//消费成功
return ConsumeOrderlyStatus.SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/
consumer.start();
System.out.println("C1 Started.");
}
}
消费者2
public class ConsumerOrderly_2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("10.130.41.36:9876"); consumer.setInstanceName("Consumer2"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的 context.setAutoCommit(true); try { for (MessageExt msg:msgs) { String msgKey = msg.getKeys(); if(msgKey.equals("key22")){ Thread.sleep(1000); } System.out.println(" 消费者2 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } //消费成功 return ConsumeOrderlyStatus.SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("C2 Started."); } }
测试结果如下:
消费者1负责队列1,并保证队列1中的所有消息是按照顺序消费的
消费者2负责队列2和队列3,根据”订单二号2″可以看出,他保证了队列2和队列3的顺序消费。
四.消费者A和消费者B同组,消费者A消费tagA,消费者B消费tagB如图
在这种情况下,因为集群中订阅消息不一致,导致消费出现问题,最后启动的消费者才可以正常消费消息。
要解决这个问题,需要保证集群中的消费者拥有统一的订阅消息,Topic和Tag要一致才可以。
参考:
https://www.jianshu.com/p/524ef06ce25a
https://mp.weixin.qq.com/s/HbIS0yEJsCPMYwwYDBIvMQ
五. 消费者A和消费者B不同组,消费者A消费tagA,消费者B消费tagB如图
在消费者1中,能保证tagA1,tagA2顺序的消费,消费者2中能保证tagB1,tagB2顺序的消费。
但是不能保证tagA1和tagB1的消费顺序。
测试代码:
消费者1
public class ConsumerOrderly_1 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("10.130.41.36:9876"); consumer.setInstanceName("Consumer1"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TopicTest", "tagA"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的 context.setAutoCommit(true); try { for (MessageExt msg:msgs) { System.out.println(" 消费者1 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } //消费成功 return ConsumeOrderlyStatus.SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("C1 Started."); } }
消费者2
public class ConsumerOrderly_2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName1"); consumer.setNamesrvAddr("10.130.41.36:9876"); consumer.setInstanceName("Consumer2"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TopicTest", "tagB"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的 context.setAutoCommit(true); try { for (MessageExt msg:msgs) { String msgKey = msg.getKeys(); if(msgKey.equals("key11")){ Thread.sleep(1000); } System.out.println(" 消费者2 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } //消费成功 return ConsumeOrderlyStatus.SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("C2 Started."); } }
测试结果:
消费者1
消费者2
“订单一号2” 在 “订单一号1” 前被消费了。