1 RabbitMQ简介

       RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,官网地址:http://www.rabbitmq.com。RabbitMQ作为一个消息代理,主要负责接收、存储和转发消息,它提供了可靠的消息机制和灵活的消息路由,并支持消息集群和分布式部署,常用于应用解耦,耗时任务队列,流量削锋等场景。本系列文章将系统介绍RabbitMQ的工作机制,代码驱动和集群配置,本篇主要介绍RabbitMQ中一些基本概念,常用的RabbitMQ Control命令,最后写一个C#驱动的简单栗子。先看一下RabbitMQ的基本结构:

  上图是RabbitMQ的一个基本结构,生产者Producer和消费者Consumer都是RabbitMQ的客户端,Producer负责发送消息,Consumer负责消费消息。

接下来我们结合这张图来理解RabbitMQ的一些概念:

  Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。

  Virtual Host:一个虚拟概念,一个Virtual Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。

  Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四种,不同类型的Exchange路由规则是不一样的(这些以后会详细介绍)。

  Queue:消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。

  Message:就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。

  Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

  Channel:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。

2 RabbitMQ安装

  因为RabbitMQ是用erlang语言开发的,所以我们在安装RabbitMQ前必须要安装erlang支持。

1 Windows平台安装

1 安装erlang

  首先下载erlang,直接下载最新版本,当前下载的是 OTP 21.3 Windows 64-bit Binary File ,下载完成后一直下一步安装即可。

2 安装RabbitMQ

  下载Windows平台的RabbtMQ,当前下载的是 rabbitmq-server-3.7.14.exe ,下载完成后一直下一步安装即可。

3 安装Web管理插件

  打开RabbitMQ Command Prompt,执行命令 rabbitmq-plugins enable rabbitmq_management 即可完成Web监控插件的安装。 

  安装完成后,打开浏览器输入 http://127.0.0.1:15672/ 使用默认账号[ name:guest / password:guest ]登录后界面如下,使用这个UI插件我们可以轻松的查看RabbitMQ中的交换机(exchange),队列(queue)等内容,也可以对exchange,queue,用户等进行添加、修改、删除操作。

  到这一步Windows平台安装RabbitMQ完成了。 打开服务管理器,RabbitMQ已经在正常运行了,如下:

2 Centos安装RabbitMQ

1 安装RabbitMQ

  这里虚拟机系统为Centos7,采用的安装方式是yum安装,为了简单,这里直接使用官方提供的erlang和RabbitMQ-server的自动安装脚本(官方安装文档),逐行执行下边的代码就可以安装完成erlang和RabbitMQ。

#安装socat
yum install socat

#安装erlang
  curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
  yum -y install erlang

#安装rabbitmq-server
  curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
  yum -y install rabbitmq-server

#启动rabbitmq服务
  systemctl start rabbitmq-server
#添加web管理插件
  rabbitmq-plugins enable rabbitmq_management

补充:如果安装完成后,执行RabbitMQ执行命令特别慢,或者出现报错【rabbitmq unable to perform an operation on node  xxx@xxx】,解决方法:

  编辑hosts,执行命令 vim /etc/hosts ,添加本机IP(或者虚拟机IP)

  命令执行结束后,使用浏览器访问 http://127.0.0.1:15672/ 也会出现web管理界面。通过上边的安装步骤安装的RabbitMQ会生成Unit文件,所以我们可以使用Systemd管理RabbitMQ服务,以下是几条常用的命令:

#启动RabbitMQ服务
  systemctl start rabbitmq-server
#停止RabbitMQ服务
  systemctl stop rabbitmq-server
#查看RabbitMQ运行状态
  systemctl status rabbitmq-server
#重启RabbitMQ服务
  systemctl restart rabbitmq-server

2 RabbitMQ Control工具

  使用Web管理界面可以实现RabbitMQ的大部分常用功能,但是有些功能WebUI是做不到的,如:开启/关闭RabbitMQ应用程序和集群的管理等。RabbitMQ Control是RabbitMQ的命令行管理工具,可以调用所有的RabbitMQ内置功能,主命令是rabbitmqctl ,下边是一个查询用户列表的命令,注意需要切换到sbin目录下执行:

  为了方便的使用RabbitMQ Control工具,我们最好添加一个环境变量,Windows默认安装时在PATH中添加一条: C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin ,不是默认安装的话找到对应的安装目录添加PATH。按照上的安装方法,Centos可以直接使用RabbitMQ Control工具,不需要多余的配置。如果想详细了解RabbitMQ Control工具,可以参考RabbitMQ Control的官方文档

  这里总结了一些最常用到的RabbitMQ Controll命令,有兴趣的小伙伴可以试着运行一下这些命令,如在命令行工具中使用命令 rabbitmqctl add_user <username> <password>  添加一个新用户。

1 基本控制命令

  基本控制命令主要用于启动、停止应用程序、runtime等

#停止rabbitmq和runtime
  rabbitmqctl shutdown
#停止erlang节点
  rabbitmqctl stop 
#启用rabbitmq 
  rabbitmqctl start_app 
#停止rabbitmq 
  rabbitmqctl stop_app
#查看状态
  rabbitmqctl status 
#查看环境
  rabbitmqctl environment
#rabbitmq恢复最初状态,内部的exchange和queue都清除
  rabbitmqctl reset 

2 服务状态管理

  这些命令主要用于用于查看exchang、channel、binding、queue、consumers:

#返回queue的信息
  list_queues [-p <vhostpath>] [<queueinfoitem> ...] 
#返回exchange的信息
  list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...] 
#返回绑定信息
  list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
 #返回链接信息
  list_connections [<connectioninfoitem> ...] 
#返回目前所有的channels
  list_channels [<channelinfoitem> ...]   
#返回consumers
  list_consumers [-p <vhostpath>] 

3 用户管理命令

  这些命令主要用于添加、修改、删除用户及管理用户权限

#在rabbitmq的内部数据库添加用户
  add_user <username> <password>  
#删除一个用户
  delete_user <username> 
#改变用户密码 
  change_password <username> <newpassword> 
#清除用户密码,禁止用户登录
  clear_password <username>
#设置用户tags,就是设置用户角色
  set_user_tags <username> <tag> 
# 查看用户列表
  list_users 
#创建一个vhost
  add_vhost <vhostpath> 
#删除一个vhosts
  delete_vhost <vhostpath>    
#列出vhosts
  list_vhosts [<vhostinfoitem> ...] 
#针对一个vhosts 给用户赋予相关权限
  set_permissions [-p <vhostpath>] <user> <conf> <write> <read> 
#清除一个用户对vhost的权限
  clear_permissions [-p <vhostpath>] <username> 
#列出所有用户对某一vhost的权限
  list_permissions [-p <vhostpath>]  
#列出某用户的访问权限
  list_user_permissions <username> 

4 集群管理命令

#clusternode表示node名称,--ram表示node以ram node加入集群中。默认node以disc node加入集群,在一个node加入cluster之前,必须先停止该node的rabbitmq应用,即先执行stop_app。
  join_cluster <clusternode> [--ram]   
#显示cluster中的所有node
  cluster_status 
#设置集群名字
  set_cluster_name <clustername>
#修改集群名字
  rename_cluster_node <oldname> <newname>
#改变一个cluster中node的模式,该节点在转换前必须先停止,不能把一个集群中唯一的disk node转化为ram node
  change_cluster_node_type <disc | ram>
#远程删除一个节点,删除前必须该节点必须先停止
  rabbitmqctl forget_cluster_node rabbit@rabbit1
#同步镜像队列
  sync_queue <queuename>
#取消同步队列
  cancel_sync_queue <queuename>  
#清空队列中所有消息
  purge_queue [-p vhost] <queuename>

  这里列举的很多命令是现阶段用不到的,如集群控制相关的命令,这些命令的用法会在以后的章节中逐渐理解。

3  C#驱动RabbitMQ

1 一个简单的栗子

  作为开发者,我们最在意的还是怎么在代码中使用RabbitMQ,可以通过官方RabbitMQ开发文档来学习RabbitMQ的使用,这里以.NET为例演示一下RabbitMQ的最基本用法。创建两个Console应用,一个作为发送消息的生产者(Producer),一个作为接受消息的消费者(Consumer),生产者向队列写入消息,消费者接受这条消息,结构如下:

 

  两个控制台应用都要添加RabbitMQ.Client包,命令如下:

Install-Package RabbitMQ.Client

 生产者(Producer)代码:

    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            //第一步:创建连接connection
            using (var connection = factory.CreateConnection())
            {
                //第二步:创建通道channel
                using (var channel = connection.CreateModel())
                {
                    //第三步:声明交换机exchang
                    channel.ExchangeDeclare(exchange: "myexchange",
                                            type: ExchangeType.Direct,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //第四步:声明队列queue
                    channel.QueueDeclare(queue: "myqueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    Console.WriteLine("生产者准备就绪....");
                    //第五步:绑定队列到交互机
                    channel.QueueBind(queue:"myqueue", exchange:"myexchange", routingKey:"mykey");
                    string message = "";
                    //第六步:发送消息
                    //在控制台输入消息,按enter键发送消息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        //基本发布
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine($"消息【{message}】已发送到队列");
                    }
                }
            }
            Console.ReadKey();
        }
    }

 消费者(Consumer)代码: 

    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            //第一步:创建连接connection
            using (var connection = factory.CreateConnection())
            {
                //第二步:创建通道channel
                using (var channel = connection.CreateModel())
                {
                    //第三步:声明队列queue
                    channel.QueueDeclare(queue: "myqueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    //第四步:定义消费者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"接受到消息【{message}】");
                    };
                    Console.WriteLine("消费者准备就绪....");
                    //第五步:处理消息
                    channel.BasicConsume(queue: "myqueue",
                                         autoAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
    } 

  依次运行Producer和Consumer两个应用程序,运行结果如下:

  注意:上边的代码在生产者和消费者的代码中都声明了exchange和queue,这主要是为了让这两个程序可以按任意顺序启动,如:我们只在生产者代码中定义了exchange和queue,却先启动消费者,这会让造成消费者找不到自己需要的exhange和queue(出现404错误)。实际开发中创建exchange/queue、绑定队列以及设置routingKey这些工作,都可以通过WebUI管理界面或者使用Rabbitmq Control工具完成。

  QueueDeclare方法用于声明队列,ExchangeDeclare用于声明交换机,我们在使用这两个方法声明时,可以设置队列和交换机的属性,如queue的名字,长度限制,exchange是否持久化、交换机类型等。

2 QueueDeclare方法详解

  在上边的栗子中我们使用了声明队列的方法  QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments) ,该方法通过参数设置队列的特性。这里介绍一下该方法 中几个参数的作用,先看一个完整的声明队列的栗子:

             //声明队列newsQueue
                    channel.QueueDeclare(queue: "myqueue",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: new Dictionary<string, object>() {
                                             //队列中消息的过期时间是8s
                                             { "x-message-ttl",1000*8 }, 
                                             //队列60s没有被使用,则删除该队列
                                             {"x-expires",1000*60 },
                                             //队列最多保存100条消息
                                             {"x-max-length",100 },
                                             //队列中ready类型消息总共不能超过1000字节
                                             {"x-max-length-bytes",1000 },
                                             //当队列消息满了时,丢弃传来后续消息
                                             {"x-overflow","reject-publish" },
                                             //丢弃的消息发送到deadExchange交换机
                                             {"x-dead-letter-exchange","deadExchange" },
                                             //丢弃的消息发送到deadExchange交换机时的RoutingKey
                                             {"x-dead-letter-routing-key","deadKey" },
                                             //队列中最大的优先级等级为10(在Publish消息时对每条消息设置优先级)
                                             {"x-max-priority",10 },
                                             //设置队列默认为lazy
                                             {"x-queue-mode","lazy" }
                                         });

QueueDeclare方法的参数如下:

  queue:队列名字;

  durable:是否持久化。设置为true时,队列信息保存在rabbitmq的内置数据库中,服务器重启时队列也会恢复(注意:重启后队列内部的消息不会恢复,怎么实现消息持久化以后会详细介绍);

  exclusive:是否排外。设置为true时只有首次声明该队列的Connection可以访问,其他Connection不能访问该队列;且在Connection断开时,队列会被删除(即使durable设置为true也会被删除);

  autoDelete:是否自动删除。设置为true时,表示在最后一条使用该队列的连接(Connection)断开时,将自动删除这个队列;

  arguments:设置队列的一些其它属性,为Dictionary<string,object>类型,下表总结了arguments中可以设置的常用属性。

参数名                                              作用 示例
Message TTL 设置队列中消息的有效时间 { “x-message-ttl”,1000*8 },设置队列中的所有消息的有效期为8s;
Auto expire 自动删除队列。一定的时间内队列没有被使用,则自动删除队列 {“x-expires”,1000*60 },设置队列的过期时长为60s,如果60s没有队列被访问,则删除队列;
Max length 队列能保存消息的最大条数 {“x-max-length”,100 },设置队列最多保存100条消息;
Max length bytes 队列中ready类型消息的总字节数  {“x-max-length-bytes”,1000 }, 设置队列中ready类型消息总共不能超过1000字节;
Overflow behaviour  当队列消息满了时,再接收消息时的处理方法。有两种处理方案:默认为”drop-head“模式,表示从队列头部丢弃消息;reject-publish“表示不接收后续的消息 {“x-overflow”,”reject-publish” },设置当队列消息满了时,丢弃传来后续消息;
Dead letter exchange  用于存储被丢弃的消息的交换机名。Overflow behaviour 的两种处理方案中丢弃的消息都会发送到这个交换机 {“x-dead-letter-exchange”,”beiyongExchange” },设置丢弃的消息发送到名字位beiyongExchange的交换机;
Dead letter routing key  被丢弃的消息发送到Dead letter exchange时的使用的routing Key {“x-dead-letter-routing-key”,”deadKey” },设置丢弃的消息发送到beiyongExchange交换机时的RoutingKey值是”deadKey”;
Maximum priority  设置队列中消息优先级的最大等级,在publish消息时可以设置单条消息的优先级等级 {“x-max-priority”,10 },设置中消息优先级的最大等级为10;
Lazy mode  设置队列的模式,如果设置为Lazy表示队列中消息尽可能存放在磁盘中,以减少内存占用;不设置时消息都存放在队列中,用以尽可能快的处理消息 {“x-queue-mode”,”lazy”},3.6以后版本可用,设置队列中消息尽可能存放在磁盘中,以减少内存占用。在消息拥堵时和消息持久化配置使用可以减少内存占用。

3 ExchangeDeclare方法详解

   声明交换机的方法 void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments) 可以设置交换机的特性,这里简单介绍一下这个方法的几个参数:

 channel.ExchangeDeclare(exchange: "myexchange",
                         type: ExchangeType.Direct,
                         durable: true,
                         autoDelete: false,
               arguments: new Dictionary<string, object> { 
                        {"alternate-exchange","BeiyongExchange" }//如果消息不能路由到该交换机,就把消息路由到备用交换机BeiyongExchange上
               });

  exchange:交换机名字。

  type:交换机类型。exchange有direct、fanout、topic、header四种类型,在下一篇会详细介绍;

  durable:是否持久化。设置为true时,交换机信息保存在rabbitmq的内置数据库中,服务器重启时交换机信息也会恢复;

  autoDelete:是否自动删除。设置为true时,表示在最后一条使用该交换机的连接(Connection)断开时,自动删除这个exchange;

  arguments:其他的一些参数,类型为Dictionary<string,object> 。

小结

  本节主要介绍了RabbitMQ的基本概念,在Windows和Centos上的安装方法,及RabbitMQ Control工具的基本使用,最后演示了一个C#驱动RabbitMQ的栗子,并详细介绍了声明queue和exchange的方法。通过这一节我们大概了解了RabbitMQ的基本使用。以后的章节会逐渐介绍RabbitMQ的四种exchange、两种Consumer的特点和使用场景,以及消息确认、优先级、持久化等,最后搭建一个高可用的RabbitMQ集群,如果文中有错误的话,希望大家可以指出,我会及时修改,谢谢!

 

参考文章

【1】 https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

【2】https://www.cnblogs.com/operationhome/p/10483840.html

 

版权声明:本文为wyy1234原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/wyy1234/p/10743567.html