了解一下RabbitMQ
RabbitMQ概述
RabbitMQ是遵从AMQP协议的 通信协议都设计到报文交互,换句话说RabbitMQ就是AMQP协议的Erlang的实现。
AMQP说到底还是一个通信协议从low-level层面举例来说,AMQP本身是应用层的协议,其填充于TCP协议的数据部分。
从high-level层面来说,AMQP是通过协议命令进行交互的。命令类似HTTP中的方法(GET PUT POST DELETE等)。
信道(Channel)在AMQP是一个很重要的概念,大多数操作都是在信道这个层面展开的
我们完全可以用Connection就能完成信道的工作,为什么还要引入信道?
试想:一个程序中有很多个线程需要从RabbitMQ中消费消息,或者生产消息,那么必然需要建立很多个Connection,也就是多个TCP连接。
建立和销毁TCP连接开销很昂贵。所以RabbitMQ采用类似NIO的做法,选择TCP连接复用。不仅可以减少性能开销,同时也便于管理。
发布订阅模式
广播模式 topic
所谓广播指的是一条消息将被所有的消费者进行处理。
直连模式 director
直连模式的特点主要就是routingkey的使用,如果现在该消息就要求指定一个具备有指定Routingkey的操作者进行处理,那么只需要两个的Routingkey匹配即可。
可以将Routingkey比喻一个唯一标记,这样就可以将消息准确的推送到消费者手中了。
主题模式 fanout
主题模式类似于广播模式与直连模式的整合操作,所有的消费者都可以接收到主题信息,但是如果要想进行正确的处理,则一定需要有一个合适的Routingkey完成操作。
交换器相当于投递包裹的邮箱,Routingkey相当于包裹的地址,BindingKey相当于包裹的目的地。
当填写在包裹上的地址和要投递的地址相匹配时,那么这个包裹就会正确投递到目的地,最后这个目的地的主人(队列)可以保留这个包裹。
如果填写地址出错,邮递员不能正确的投递到目的地,包裹可能被退回给寄件人,也有可能被丢弃。
RabbitMQ官方文档和API都把Routingkey和BingdingKey都看做Routingkey下面代码中红色部分 就都当Routingkey使用
消息生产者
public class MessageProducer { private static final String EXCHANGE_NAME ="com.sunkun.topic";//消息队列名称 private static final String HOST="192.168.1.105"; private static final int PORT=5672; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory();//建立一个连接工厂 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername("sunkun"); factory.setPassword("123456"); //factory.setVirtualHost(virtualHost) 使用虚拟主机的最大好处 可以区分不同用户的操作空间 每一个虚拟主机有一个自己的空间管理 Connection conn = factory.newConnection();//定义一个新的RabbitMQ的连接 Channel channel = conn.createChannel();//创建一个通讯的通道 //定义该通道要使用的队列名称 此时队列已经创建过了 //第一个参数 队列名称(这个队列可能存在也可能不存在) //第二个参数 是否持久保存 //第三个参数 此队列是否为专用的队列信息 //第四个参数 是否允许自动删除 //channel.queueDeclare(QUENE_NAME, true, false, true,null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); long start = System.currentTimeMillis(); System.out.println("消息开始"+start); for(int i=0;i<1000;i++){ String message = "sk - "+i; if(i%2==0){ //MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化 channel.basicPublish(EXCHANGE_NAME, "sk1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//进行消息发送 }else{ channel.basicPublish(EXCHANGE_NAME, "sk2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//进行消息发送 } } long end = System.currentTimeMillis(); System.out.println("消息花费时间"+(end-start)); channel.close(); } }
消息消费者
public class MessageConsumer { private static final String EXCHANGE_NAME ="com.sunkun.topic";//消息队列名称 private static final String HOST="192.168.1.105"; private static final int PORT=15672; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory();//建立一个连接工厂 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername("sunkun"); factory.setPassword("123456"); Connection conn = factory.newConnection();//定义一个新的RabbitMQ的连接 Channel channel = conn.createChannel();//创建一个通讯的通道 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue();//通过通道获取一个队列名称 channel.queueBind(queueName, EXCHANGE_NAME, "sk2");//进行绑定处理 //在RabbitMQ里面,所有的消费者信息是通过一个回调方法完成的 Consumer consumer = new DefaultConsumer(channel){//需要复写指定的方法实现消息处理 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费者sk2:"+message);//可以启动多个消费者 super.handleDelivery(consumerTag, envelope, properties, body); } }; channel.basicConsume(queueName,consumer); } }
MQ保证消息的可靠性
1)持久化
持久化可以提高RabbitMQ的可靠性,防止在异常情况(重启,关闭,宕机)下的数据丢失。
持久化可分为三个部分:交换器的持久化,队列的持久化和消息的持久化。
交换器的持久化:是通过声明交换器时将druable参数设置为true来实现的。如果交换器不设置持久化,那么在RabbitMQ重启之后相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。
对于一个长期使用的交换器来说,建议其置为持久化。(消息不直接往队列发,往exchange发送 可以实现广播模式)
队列的持久化:是通过声明队列时将durable参数置为true实现的。如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
消息的持久化:因为队列的持久化能保证其本身的元数据不会因为异常情况而丢失,但是不能保证内部存储的消息不会丢失。要确保消息不会丢失,需求将其设置为持久化。
通过将消息的投递模式(MessageProperties.PERSISTENT_TEXT_PLAIN)即可实现消息的持久化
2)集群
RabbitMQ的集群本身不带有所谓的HA机制以及负载均衡机制
本文主要讲镜像队列
在持久化的消息正确存入到RabbitMQ之后 还需要一段时间(虽然时间很短,但不可忽视)才能存入磁盘中,如果这段时间发生了宕机,消息保存还没来得及落盘,那么这些消息将会丢失。
这里可以引入RabbitMQ的镜像队列机制,相当于配置了副本,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(Slave),在实际生产环境中的关键业务队列都会设置镜像队列。
提醒:所谓的镜像队列只是进行数据的副本而已,在所谓的RabbitMQ集群里面并不支持HA机制以及所谓的负载均衡,如果说现在一台主机挂掉了,那么其他主机肯定无法进行合理读取的。
如果想要安全的使用RabbitMQ就要继续追加负载均衡组件,列如HAProxy LVS等等,如果要保证负载均衡组件的高可用,还应该继续追加KeepAlive组件。
3)生产者确认
除上面两个问题外 我们还遇到一个新问题:当消息的生产者将消息发送出去之后,消息到底有没有正确的到达服务器呢?
如果消息到达服务器之前就丢失,那么持久化也解决不了问题,因为消息就没有到达服务器,何谈持久化呢。
通常会有两种方法解决此问题一时事物机制,只有消息被成功接收,事物才能提交成功,否则便可在捕获异常之后进行事物回滚,于此同时可以进行消息重发。
但使用事物机制会大大降低RabbitMQ的性能,我们一般采取发送方确认机制。
发送方确认机制:生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),
一旦消息被投递到所有的匹配队列之后,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经到达目的地了。
如果消息和队列是持久化的,那么消息确认会在消息写入磁盘后发出。