RabbitMQ 的工作模式
1. 简介
- MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法;
- RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言)编写的,可复用的企业级消息系统;
- AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制;
- 支持主流操作系统:Linux、Windows,MacOX等;
- 支持多种客户端开发语言:Java、Python、Ruby、.NET,PHP、C/C++、Node.js等
2. 下载
下载地址:http://www.rabbitmq.com/download.html
3. windows下安装
3.1. 安装Erlang
下载:http://www.erlang.org/download/otp_win64_17.3.exe
安装:
安装完成。
3.2. 安装RabbitMQ
安装完成。
启动、停止、重新安装等。
3.3. 启用管理工具
1、 点击
2、 输入命令:
rabbitmq-plugins enable rabbitmq_management
3、 在浏览器中输入地址查看:http://127.0.0.1:15672/
4、 使用默认账号登录:guest/ guest
4. Linux下安装
4.1. 安装Erlang
4.1.1.
添加yum支持
cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm –import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
sudo yum install erlang
4.2. 安装RabbitMQ
上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
安装:
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
4.2.1.
启动、停止
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
4.2.2.
设置开机启动
chkconfig rabbitmq-server on
4.2.3.
设置配置文件
cd /etc/rabbitmq
cp
/usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
4.2.4.
开启用户远程访问
vi /etc/rabbitmq/rabbitmq.config
注意要去掉后面的逗号。
4.2.5.
开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
4.2.6.
防火墙开放15672端口
/sbin/iptables -I INPUT -p tcp –dport 15672
-j ACCEPT
/etc/rc.d/init.d/iptables save
5. 添加用户
5.1. 用户角色
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
5.2. 创建Virtual Hosts
设置权限:
6. RabbitMQ实例
6.1. 简单模式
特点:一个生产者P发送消息到队列Q,一个消费者C接收
生产者实现步骤:
- 创建连接工厂
- 设置服务地址、端口、用户名、密码、vHost
- 从连接工厂中获取连接
- 从连接获取通道channel
- 使用channle 声明队列
- 使用channle 向队列发送消息
- 关闭channle
- 关闭连接
代码如下:
public class Send {
private final static String QUEUE_NAME = “test_queue”;
public static void main(String[] args) {
//定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost(“127.0.0.1”); //端口 factory.setPort(5672); //设置 vhost ,用户名称,账号 factory.setVirtualHost(“taotao”); factory.setUsername(“taotao”); factory.setPassword(“123456”); try { //通过工厂获取连接 Connection conn= factory.newConnection(); //连接中获取通道中 Channel channel = conn.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //设置消息内容 String msg = “Hello World! – 02”; channel.basicPublish(“”, QUEUE_NAME, null, msg.getBytes());
System.out.println(“Send :发送成功!”); channel.close(); conn.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
当生产者代码成功运行后,我们通过管理工具查看会发现一个队列,并且队列中有一条信息。
消费者实现步骤:
- 创建连接工厂
- 设置服务地址、端口、用户名、密码、vHost
- 从连接工厂中获取连接
- 从连接获取通道channel
- 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
- 定义队列消费者
- 监听队列
- 获取消息
代码如下:
public class Revc {
private final static String QUEUE_NAME = “test_queue”;
public static void main(String[] args) throws Exception {
//定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost(“127.0.0.1”); //设置端口 factory.setPort(5672); //设置vhost 用户名称 密码 factory.setUsername(“taotao”); factory.setPassword(“123456”); factory.setVirtualHost(“taotao”);
//创建连接 Connection conn = factory.newConnection(); //从连接中获取通道
//声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义队列的消费者
//监听队列 channel.basicConsume(QUEUE_NAME, true, queueingConsumer); //获取消息 while (true) QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(“Revc :”+msg); } } }
|
当消费者代码成功运行后,我们通过管理工具会发现队列中消息的数量从1变为0,这就说明消息被消费者获取以后,会被队列删除。
连接工具类:
代码如下:
/** * rabbitmq 工具类 * @author Administrator * */ public class ConnectionUtils {
public static Connection { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost(“127.0.0.1”); //设置端口 factory.setPort(5672); //设置vhost factory.setVirtualHost(“taotao”); //设置用户名称 factory.setUsername(“taotao”); //设置密码 factory.setPassword(“123456”); //从工厂中获取连接 Connection conn = factory.newConnection(); return conn; } } |
6.2. 工作队列模式 Work
Queue
特点:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列
生产者实现步骤【参考6.1】
代码如下:
public class Send {
private final static String QUEUE_NAME = “test_queue_work”; public static void main(String[] args) {
try { //获取连接 Connection conn = ConnectionUtils.getConnection(); //从连接中获取管道 Channel channel = conn.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //设置消息 for(int i = 0; i < 50; i++) { String msg = “Holle Work ! -“+i; channel.basicPublish(“”, QUEUE_NAME, null,msg.getBytes()); Thread.sleep(10*i); } channel.close(); conn.close(); } catch(Exception e) {
} } } |
消费者实现步骤:
- 创建连接工厂
- 设置服务地址、端口、用户名、密码、vHost
- 从连接工厂中获取连接
- 从连接获取通道channel
- 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
- 定义队列消费者
- 监听队列并手动返回确认
- 获取消息【如果设置了手动返回确认则需要返回确消息】
代码如下:
public class Revc {
private final static String QUEUE_NAME = “test_queue_work”; public static void main(String[] args) { try { //获取连接 Connection conn = //从连接中获取管道 Channel channel = conn.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //同一时刻服务器只会发一条消息费着 channel.basicQos(1);
//定义队列消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false,consumer); //获取消息 while(true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(“Revc–>”+msg);
//返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), Thread.sleep(10); } } catch(Exception e) {
} } }
|
可以设置多个消费者。并设置其休眠时间不同。可以得出,休眠时间越长,获取消息越少。这也体现了work模式的【能者多劳】模式。
注意:设置能者多劳模式必须设置:同一时刻服务器只会发一条消息费者
channel.basicQos(1);
6.3.
发布/订阅模式Publish/Subscribe
特点:
1、
一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者
2、
生产者:可以将消息发送到队列或者是交换机
3、
消费者:只能从队列中获取消息。
4、
如果消息发送到没有队列绑定的交换机上,那么消息将丢失。交换机不能存储消息,消息存储在队列中
生产者实现步骤:
- 创建连接工厂
- 设置服务地址、端口、用户名、密码、vHost
- 从连接工厂中获取连接
- 从连接获取通道channel
- 使用channle 声明交换机
- 发送消息到交换机
- 关闭channel
- 关闭连接
代码如下:
public class Send {
private final static String EXCHANGE_NAME = “test_exchange_fanout”;
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel();
// 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, “fanout”);
// 消息内容 String message = “Hello World!”; channel.basicPublish(EXCHANGE_NAME, “”, null, message.getBytes()); System.out.println(” [x] Sent ‘” + message + “‘”); channel.close(); connection.close(); } } |
消费者实现步骤:
- 创建连接工厂
- 设置服务地址、端口、用户名、密码、vHost
- 从连接工厂中获取连接
- 从连接获取通道channel
- 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
- 将队列绑定到交换机
- 定义队列消费者
- 监听队列并手动返回确认
- 获取消息【如果设置了手动返回确认则需要返回确消息】
代码如下:
public class Revc { private final static String QUEUE_NAME = “test_queue_work”;
private final static String EXCHANGE_NAME = “test_exchange_fanout”;
public static void main(String[] argv) throws Exception {
//获取连接 Connection //从连接中获取管道 Channel //声明队列 ch.queueDeclare(QUEUE_NAME, false, false, false, null); //将队列绑定到交换机 ch.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”); //定义队列消费者 QueueingConsumer //监听队列,手动返回完成 ch.basicConsume(QUEUE_NAME, false,consumer);
//获取消息 while(true) { QueueingConsumer.Delivery String System.out.println(” [ Revc ]”+msg); Thread.sleep(100); ch.basicAck(delivery.getEnvelope().getDeliveryTag(), } } } |
6.4. 路由模式Routing
特点:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key
生产者实现步骤:
- 创建连接工厂
- 设置服务地址、端口、用户名、密码、vHost
- 从连接工厂中获取连接
- 从连接获取通道channel
- 使用channle 声明交换机
- 发送消息到交换机并指定key
- 关闭channel
- 关闭连接
代码如下:
public class Send {
private final static String EXCHANGE_NAME = “test_exchange_direct”; public static void main(String[] args) throws Exception {
//获取连接 Connection conn = ConnectionUtils.getConnection(); //从连接中获取管道 Channel ch = conn.createChannel(); //声明exchange ch.exchangeDeclare(EXCHANGE_NAME, “direct”); //设置消息内容 String msg = “Hello Work !”; ch.basicPublish(EXCHANGE_NAME, System.out.println(” [x] Sent ‘” + msg + “‘”); ch.close(); conn.close();
} } |
- 创建连接工厂
- 设置服务地址、端口、用户名、密码、vHost
- 从连接工厂中获取连接
- 从连接获取通道channel
- 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
- 将队列绑定到交换机并指定key
- 定义队列消费者
- 监听队列并手动返回确认
- 获取消息【如果设置了手动返回确认则需要返回确消息】
代码如下:
public class Revc {
private final static String QUEUE_NAME = “test_queue_work”;
private final static String EXCHANGE_NAME = “test_exchange_direct”;
public static void main(String[] args) throws Exception{
//获取连接 Connection //获取chanel 通道 Channel //声明队列 ch.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定队列到交换机 ch.queueBind(QUEUE_NAME, //同一时刻服务器只发送一条信息给消费者 ch.basicQos(1); //定义队列消费者 QueueingConsumer //监听队列,并手动返回信息 ch.basicConsume(QUEUE_NAME, false,consumer);
//获取信息 while(true) { QueueingConsumer.Delivery String System.out.println(” [x] Received ‘” + msg + “‘”); Thread.sleep(10);
ch.basicAck(delivery.getEnvelope().getDeliveryTag(), } } }
|
6.5.
通配符模式Topics
特点:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
实现步骤参考路由模式