1.  简介

  • MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法;
  • RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言)编写的,可复用的企业级消息系统;
  • AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制;
  • 支持主流操作系统:Linux、Windows,MacOX等;
  • 支持多种客户端开发语言:Java、Python、Ruby、.NET,PHP、C/C++、Node.js等

2.  下载

下载地址:http://www.rabbitmq.com/download.html

3.  windows下安装

3.1.  安装Erlang

下载:http://www.erlang.org/download/otp_win64_17.3.exe

安装:

 

 

 

 

 

安装完成。

3.2.  安装RabbitMQ

 

 

 

安装完成。

 

 

 

启动、停止、重新安装等。

3.3.  启用管理工具

1、  点击

2、  输入命令:

rabbitmq-plugins enable rabbitmq_management

 

3、  在浏览器中输入地址查看:http://127.0.0.1:15672/

 

4、  使用默认账号登录:guest/ guest

 

4.  Linux下安装

4.1.  安装Erlang

4.1.1. 
添加yum支持

cd /usr/local/src/

mkdir rabbitmq

cd rabbitmq

 

wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm

rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

 

rpm –import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

 

sudo yum install erlang

 

 

4.2.  安装RabbitMQ

上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/

安装:

rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

 

4.2.1. 
启动、停止

service rabbitmq-server start

service rabbitmq-server stop

service rabbitmq-server restart

4.2.2. 
设置开机启动

chkconfig rabbitmq-server on

4.2.3. 
设置配置文件

cd /etc/rabbitmq

cp
/usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/

 

mv rabbitmq.config.example rabbitmq.config

4.2.4. 
开启用户远程访问

vi /etc/rabbitmq/rabbitmq.config

 

注意要去掉后面的逗号。

4.2.5. 
开启web界面管理工具

rabbitmq-plugins enable rabbitmq_management

service rabbitmq-server restart

4.2.6. 
防火墙开放15672端口

/sbin/iptables -I INPUT -p tcp –dport 15672
-j ACCEPT

/etc/rc.d/init.d/iptables save

5.  添加用户

 

5.1.  用户角色

1、  超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、  监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、  策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、  普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、  其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

 

5.2.  创建Virtual Hosts

 

 

 

设置权限:

 

 

6.  RabbitMQ实例

6.1.  简单模式

 

特点:一个生产者P发送消息到队列Q,一个消费者C接收

生产者实现步骤:

  1. 创建连接工厂
  2. 设置服务地址、端口、用户名、密码、vHost
  3. 从连接工厂中获取连接
  4. 从连接获取通道channel
  5. 使用channle 声明队列
  6. 使用channle 向队列发送消息
  7. 关闭channle
  8. 关闭连接

代码如下:

public class Send {

 

    private final static String QUEUE_NAME = “test_queue”;

   

    public static void main(String[] args) {

        

         //定义连接工厂

         ConnectionFactory factory = new ConnectionFactory();

         //设置服务地址

         factory.setHost(“127.0.0.1”);

         //端口

         factory.setPort(5672);

         //设置 vhost ,用户名称,账号

         factory.setVirtualHost(“taotao”);

         factory.setUsername(“taotao”);

         factory.setPassword(“123456”);

         try {

             //通过工厂获取连接

             Connection conn= factory.newConnection();

             //连接中获取通道中

             Channel channel = conn.createChannel();

             //声明队列

             channel.queueDeclare(QUEUE_NAME, false, false, false, null);

             //设置消息内容

             String msg = “Hello World! – 02”;

             channel.basicPublish(“”, QUEUE_NAME, null, msg.getBytes());

            

             System.out.println(“Send :发送成功!”);

             channel.close();

             conn.close();

         } catch (Exception e) {

             // TODO Auto-generated catch block

             e.printStackTrace();

         }

    }

}

当生产者代码成功运行后,我们通过管理工具查看会发现一个队列,并且队列中有一条信息。

 

消费者实现步骤:

  1. 创建连接工厂
  2. 设置服务地址、端口、用户名、密码、vHost
  3. 从连接工厂中获取连接
  4. 从连接获取通道channel
  5. 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
  6. 定义队列消费者
  7. 监听队列
  8. 获取消息

代码如下:

public class Revc {

   

    private final static String QUEUE_NAME = “test_queue”;

   

    public static void main(String[] args) throws Exception {

        

         //定义连接工厂

         ConnectionFactory factory = new ConnectionFactory();

         //设置服务地址

         factory.setHost(“127.0.0.1”);

         //设置端口

         factory.setPort(5672);

         //设置vhost 用户名称  密码

         factory.setUsername(“taotao”);

         factory.setPassword(“123456”);

         factory.setVirtualHost(“taotao”);

        

         //创建连接

         Connection conn = factory.newConnection();

         //从连接中获取通道

       
Channel channel = conn.createChannel();

       

        //声明队列

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //定义队列的消费者

       
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

       

        //监听队列

        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);

        //获取消息

        while (true)

       
{

        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

        String msg = new String(delivery.getBody());

        System.out.println(“Revc :”+msg);

         }

    }

}

 

当消费者代码成功运行后,我们通过管理工具会发现队列中消息的数量从1变为0,这就说明消息被消费者获取以后,会被队列删除。

连接工具类:

代码如下:

/**

 * rabbitmq 工具类

 * @author Administrator

 *

 */

public class ConnectionUtils {

 

    public static Connection
getConnection() throws Exception

    {

         //创建连接工厂

         ConnectionFactory factory = new ConnectionFactory();

         //设置服务地址

         factory.setHost(“127.0.0.1”);

         //设置端口

         factory.setPort(5672);

         //设置vhost

         factory.setVirtualHost(“taotao”);

         //设置用户名称

         factory.setUsername(“taotao”);

         //设置密码

         factory.setPassword(“123456”);

         //从工厂中获取连接

         Connection conn =  factory.newConnection();

         return conn;

    }

}

6.2.  工作队列模式 Work
Queue

 

特点:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

生产者实现步骤【参考6.1】

代码如下:

public class Send {

 

    private final static String QUEUE_NAME = “test_queue_work”;

    public static void main(String[] args) {

        

         try

         {

             //获取连接

             Connection conn = ConnectionUtils.getConnection();

             //从连接中获取管道

             Channel channel = conn.createChannel();

             //声明队列

             channel.queueDeclare(QUEUE_NAME, false, false, false, null);

             //设置消息

             for(int i = 0; i < 50; i++)

             {

                  String msg = “Holle Work ! -“+i;

                  channel.basicPublish(“”, QUEUE_NAME, null,msg.getBytes());

                  Thread.sleep(10*i);

             }

             channel.close();

             conn.close();

         }

         catch(Exception e)

         {

            

         }

    }

}

 

消费者实现步骤:

  1. 创建连接工厂
  2. 设置服务地址、端口、用户名、密码、vHost
  3. 从连接工厂中获取连接
  4. 从连接获取通道channel
  5. 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
  6. 定义队列消费者
  7. 监听队列并手动返回确认
  8. 获取消息【如果设置了手动返回确认则需要返回确消息】

代码如下:

public class Revc {

 

    private final static String QUEUE_NAME = “test_queue_work”;

    public static void main(String[] args) {

         try

         {

             //获取连接

             Connection conn = 
ConnectionUtils.getConnection();

             //从连接中获取管道

             Channel channel = conn.createChannel();

             //声明队列

             channel.queueDeclare(QUEUE_NAME, false, false, false, null);

             //同一时刻服务器只会发一条消息费着

             channel.basicQos(1);

            

             //定义队列消费者

             QueueingConsumer consumer = new QueueingConsumer(channel);

             //监听队列,手动返回完成

             channel.basicConsume(QUEUE_NAME, false,consumer);

             //获取消息

             while(true)

             {

                  QueueingConsumer.Delivery delivery = consumer.nextDelivery();

                  String msg = new String(delivery.getBody());

                  System.out.println(“Revc–>”+msg);

                 

                  //返回确认状态

                  channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
true);

                  Thread.sleep(10);

             }

         }

         catch(Exception e)

         {

            

         }  

    }

}

 

可以设置多个消费者。并设置其休眠时间不同。可以得出,休眠时间越长,获取消息越少。这也体现了work模式的【能者多劳】模式。

注意:设置能者多劳模式必须设置:同一时刻服务器只会发一条消息费者

channel.basicQos(1);

6.3.      
发布/订阅模式Publish/Subscribe

 

特点:

1、         
一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

2、         
生产者:可以将消息发送到队列或者是交换机

3、         
消费者:只能从队列中获取消息。

4、         
如果消息发送到没有队列绑定的交换机上,那么消息将丢失。交换机不能存储消息,消息存储在队列中

 生产者实现步骤:

  1. 创建连接工厂
  2. 设置服务地址、端口、用户名、密码、vHost
  3. 从连接工厂中获取连接
  4. 从连接获取通道channel
  5. 使用channle 声明交换机
  6. 发送消息到交换机
  7. 关闭channel
  8. 关闭连接

 代码如下:

public class Send {

 

    private final static String EXCHANGE_NAME = “test_exchange_fanout”;

   

    public static void main(String[] args) throws Exception {

        

          // 获取到连接以及mq通道

        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

 

        // 声明exchange

        channel.exchangeDeclare(EXCHANGE_NAME, “fanout”);

 

        // 消息内容

        String message = “Hello World!”;

        channel.basicPublish(EXCHANGE_NAME, “”, null, message.getBytes());

        System.out.println(” [x] Sent ‘” + message + “‘”);

        channel.close();

        connection.close();

    }

}

消费者实现步骤:

  1. 创建连接工厂
  2. 设置服务地址、端口、用户名、密码、vHost
  3. 从连接工厂中获取连接
  4. 从连接获取通道channel
  5. 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
  6. 将队列绑定到交换机
  7. 定义队列消费者
  8. 监听队列并手动返回确认
  9. 获取消息【如果设置了手动返回确认则需要返回确消息】

代码如下:

public class Revc {

    private final static String QUEUE_NAME = “test_queue_work”;

 

    private final static String EXCHANGE_NAME = “test_exchange_fanout”;

 

    public static void main(String[] argv) throws Exception {

 

        //获取连接

    Connection
conn = ConnectionUtils.getConnection();

    //从连接中获取管道

    Channel
ch = conn.createChannel();

    //声明队列

    ch.queueDeclare(QUEUE_NAME, false, false, false, null);

    //将队列绑定到交换机

    ch.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”);

    //定义队列消费者

    QueueingConsumer
consumer = new QueueingConsumer(ch);

    //监听队列,手动返回完成

    ch.basicConsume(QUEUE_NAME, false,consumer);

   

    //获取消息

    while(true)

    {

         QueueingConsumer.Delivery
delivery = consumer.nextDelivery();

         String
msg = new String(delivery.getBody());

         System.out.println(” [ Revc ]”+msg);

         Thread.sleep(100);

         ch.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);

    }

    }

}

6.4. 路由模式Routing

 

特点:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key

生产者实现步骤:

  1. 创建连接工厂
  2. 设置服务地址、端口、用户名、密码、vHost
  3. 从连接工厂中获取连接
  4. 从连接获取通道channel
  5. 使用channle 声明交换机
  6. 发送消息到交换机并指定key
  7. 关闭channel
  8. 关闭连接

代码如下:

public class Send {

 

    private final static String EXCHANGE_NAME = “test_exchange_direct”;

    public static void main(String[] args) throws Exception {

        

         //获取连接

         Connection conn = ConnectionUtils.getConnection();

         //从连接中获取管道

         Channel ch = conn.createChannel();

         //声明exchange

         ch.exchangeDeclare(EXCHANGE_NAME, “direct”);

         //设置消息内容

         String msg = “Hello Work !”;

         ch.basicPublish(EXCHANGE_NAME,
“key”, null,msg.getBytes());

         System.out.println(” [x] Sent ‘” + msg + “‘”);

        ch.close();

        conn.close();

        

    }

}

  1. 创建连接工厂
  2. 设置服务地址、端口、用户名、密码、vHost
  3. 从连接工厂中获取连接
  4. 从连接获取通道channel
  5. 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
  6. 将队列绑定到交换机并指定key
  7. 定义队列消费者
  8. 监听队列并手动返回确认
  9. 获取消息【如果设置了手动返回确认则需要返回确消息】

代码如下:

public class Revc {

 

    private final static String QUEUE_NAME = “test_queue_work”;

 

    private final static String EXCHANGE_NAME = “test_exchange_direct”;

   

    public static void main(String[] args) throws Exception{

        

    //获取连接

    Connection
conn = ConnectionUtils.getConnection();

    //获取chanel 通道

    Channel
ch = conn.createChannel();

    //声明队列

    ch.queueDeclare(QUEUE_NAME, false, false, false, null);

    //绑定队列到交换机

    ch.queueBind(QUEUE_NAME,
EXCHANGE_NAME, “key”);

    //同一时刻服务器只发送一条信息给消费者

    ch.basicQos(1);

    //定义队列消费者

    QueueingConsumer
consumer = new QueueingConsumer(ch);

    //监听队列,并手动返回信息

    ch.basicConsume(QUEUE_NAME, false,consumer);

   

    //获取信息

    while(true)

    {

         QueueingConsumer.Delivery
delivery = consumer.nextDelivery();

         String
msg = new String(delivery.getBody());

         System.out.println(” [x] Received ‘” + msg + “‘”);

            Thread.sleep(10);

 

            ch.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);

    }

    }

}

 

6.5.      
通配符模式Topics

 

特点:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor

“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

 

实现步骤参考路由模式

 

版权声明:本文为wujiaofen原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/wujiaofen/p/11084249.html