python的pika模块操作rabbitmq
- 上一篇博文rabbitmq的构架和原理,了解了rabbitmq的使用原理,接下来使用python的pika模块实现使用rabbitmq。
环境搭建
-
安装python,不会的请参考Linux安装配置python3.6环境
-
安装pika模块
pip install pika
实例介绍
-
先从一个最简单的生产者/消费者说起
-
send.py
import pika
HOST = '127.0.0.1'
PORT = '5672'
QUEUENAME = 'eeg'
FILENAME = './eeg2.txt'
EXCHANGE = 'eegs'
ROUT_KEY = 'eeg'
USERNAME = 'user'
PASSWD = 'passwd'
class Client(object):
def __init__(self,host,queuename,filename,username=USERNAME,passwd=PASSWD):
self.__host = host
self.__name = queuename
self.__filename = filename
self.__username = username
self.__passwd = passwd
self.connect = self.connect_mq()
# 连接mq
def connect_mq(self):
# 添加用户名和密码
credentials = pika.PlainCredentials(self.__username, self.__passwd)
# 配置连接参数
parameters = pika.ConnectionParameters(host=self.__host,credentials=credentials)
try:
# 创建一个连接对象
connection = pika.BlockingConnection(parameters)
except Exception as e:
print(e)
else:
return connection
# 创建一个信道
def channel_mq(self):
channel = self.connect.channel()
return channel
# 打开文件
def open_data(self):
try:
with open(self.__filename,'r',encoding='utf-8') as f:
data = f.read()
return data
except Exception as e:
print(e)
def run(self,channel):
# 声明一个队列,durable参数声明队列持久化
channel.queue_declare(queue='eeg1',durable=True)
# 使用默认交换机投递消息,返回TRUE或False
channel.basic_publish(exchange='',
routing_key='eeg1',
body="hello wrold!",
properties=pika.BasicProperties(delivery_mode=2))
# 关闭tcp连接
def close_connect(self):
self.connect.close()
# 关闭信道
def close_channel(self,channel):
channel.close()
if __name__ == '__main__':
client = Client(HOST,MQ_NAME,FILENAME)
channel = client.channel_mq()
client.run(channel)
client.close_connect()
- receiver.py
import pika
HOST = '127.0.0.1'
PORT = '5672'
QUEUENAME = 'eeg'
EXCHANGE = 'eegs'
ROUT_KEY = 'eeg'
USERNAME = 'user'
PASSWD = 'passwd'
class Receive(object):
def __init__(self,host,queuename,username=USERNAME,passwd=PASSWD):
self.__host = host
self.__name = queuename
self.__username = username
self.__passwd = passwd
# 连接mq队列
def connect_mq(self):
# 添加用户名和密码
credentials = pika.PlainCredentials(self.__username, self.__passwd)
# 配置连接参数
parameters = pika.ConnectionParameters(host=self.__host, credentials=credentials)
# 创建一个连接对象
connection = pika.BlockingConnection(parameters)
# 创建一个信道
channel = connection.channel()
return channel
def run(self):
channel = self.connect_mq()
# 订阅消息
channel.basic_consume(self.callback,
queue=self.__name,
no_ack=False)
# print(' [*] Waiting for messages. To exit press CTRL+C')
# 循环等待
channel.start_consuming()
# 接收消息处理函数
def callback(self,ch, method, properties, body):
print(" [x] Received %r" % str(body.decode('utf-8'))
print('接收成功!')
# 发送确认
ch.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == '__main__':
receive = Receive(HOST,QUEUENAME)
receive.run()
- 我们来分析每一步的所使用的方法
创建一个连接connection
# 添加用户名和密码
credentials = pika.PlainCredentials(self.__username, self.__passwd)
# 配置连接参数
parameters = pika.ConnectionParameters(host=self.__host, credentials=credentials)
# 创建一个连接对象
connection = pika.BlockingConnection(parameters)
- pika.PlainCredentials:一个凭据类
# 该类传递的参数
def __init__(self, username, # 用户名
password, # 密码
erase_on_connect=False): # 是否保存凭据在内存中,如果参数是True,那么该类会在连接完成后删除
- pika.ConnectionParameters:TCP连接的参数类
def __init__(self,
host=_DEFAULT, # 默认的ip127.0.0.1
port=_DEFAULT, # 端口5672
virtual_host=_DEFAULT, # 默认的虚拟主机/
credentials=_DEFAULT, # 默认的凭据 user:guest passwd:guest
# 以下的参数含义可以在配置文件中找到,一般不需要在这里配置
channel_max=_DEFAULT,
frame_max=_DEFAULT,
heartbeat=_DEFAULT,
ssl=_DEFAULT,
ssl_options=_DEFAULT,
connection_attempts=_DEFAULT,
retry_delay=_DEFAULT,
socket_timeout=_DEFAULT,
locale=_DEFAULT,
backpressure_detection=_DEFAULT,
blocked_connection_timeout=_DEFAULT,
client_properties=_DEFAULT,
tcp_options=_DEFAULT,
**kwargs):
- pika.BlockingConnection:创建一个连接类
def __init__(self, parameters=None, _impl_class=None):
# 传递一个参数类就可以了,_impl_class只用于测试
创建一个通道
# 创建一个信道
channel = connection.channel()
def channel(self,
channel_number=None): # 指定通道的编号,一般让rabbitmq自动去管理
声明一个队列queue
- channel.queue_declare
def queue_declare(self, queue='', # 队列的名字,默认为空,此时将自动创建一个名字,
passive=False, # 检查一下队列是否存在,如果该参数为True,该方法判断队列存在否,不会声明队列;存在返回queue的状态,不存在报错
durable=False, # 队列持久化参数,默认不持久化
exclusive=False, # 设置独享队列,该队列只被当前的connection使用,如果该tcp关闭了,队列会被删除
auto_delete=False,# 当最后一个消费者退订后自动删除,默认不开启
arguments=None) # 一个字典,用于队列传递额外的参数
声明一个交换机exchange
- channel.exchange_declare
def exchange_declare(self, exchange=None, # 交换机的名字,为空则自动创建一个名字
exchange_type='direct', # 默认交换机类型为direct
passive=False, # 检查交换机是否存在,存在返回状态信息,不存在返回404错误
durable=False, # 设置是否持久化
auto_delete=False, # 最后一个队列解绑则删除
internal=False, # 是否设置为值接收从其他交换机发送过来的消息,不接收生产者的消息
arguments=None): # 一个字典,用于传递额外的参数
声明一个绑定
- channel.queue_bind
def queue_bind(self, queue, # 队列的名字
exchange, # 交换机的名字
routing_key=None, # 路由键规则,当为None时,默认使用queue的名字作为路由键规则
arguments=None): # 一个字典,传递额外的参数
# 返回绑定的状态信息
- channel.exchange_bind
def exchange_bind(self, destination=None, # 目的交换机的名字
source=None, # 源交换机的名字
routing_key='', # 路由键规则,当为None时,默认使用queue的名字作为路由键规则
arguments=None): # 一个字典,传递额外的参数
投递一条消息
- channel.basic_publish
def basic_publish(self, exchange, # 交换机的名字
routing_key, # 路由键,吐过交换机是扇形交换机,可以随便写
body, # 消息主体
properties=None, # 消息的属性
mandatory=False, # 是否设置消息托管
immediate=False) # 是否消息实时同步确认,一般和confirm模式配合使用
# properties属性有一个专门的类来设置
pika.BasicProperties:
def __init__(self, content_type=None, content_encoding=None, headers=None, delivery_mode=None, priority=None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=None):
self.content_type = content_type # 消息的类型,如text/html,json等
self.content_encoding = content_encoding # 消息的编码,如gbk,utf-8等
self.headers = headers # 消息头,可以和头交换机约定规则
self.delivery_mode = delivery_mode # 消息持久化,2表示持久化,
self.priority = priority # 消息的优先权
self.correlation_id = correlation_id
self.reply_to = reply_to
self.expiration = expiration # 消息的有效期
self.message_id = message_id # 消息iD,自动管理
self.timestamp = timestamp # 消息的时间戳
self.type = type
self.user_id = user_id
self.app_id = app_id # 发布应用的ID
self.cluster_id = cluster_id
订阅消息
-
方式一:客户端主动推送消息
-
channel.basic_consume+channel.start_consuming
def basic_consume(self, # 启动队列消费者,告诉服务端开启一个消费者
consumer_callback, # 消费者回调函数
queue, # 队列名称
no_ack=False, # 发送确认,默认开启消息确认模式,为True是关闭消息确认;如果回调函数中不发送消息确认,消息会一直存在队列中,等待推送给新连接的消费者
exclusive=False, # 设置独享消费者,不允许其他消费者订阅该队列
consumer_tag=None, # 消费者标签,如果不指定,系统自动生成
arguments=None): # 字典,额外的参数
consumer_callback:回调函数
def consumer_callback(channel, # 信道
method, # 一个交付的deliver对象,用来通知客户端消息
properties, # 消息的属性,就是消息在发送时定义的属性
body) # 消息的主题,二进制格式
method:spec.Basic.Deliver:交付对象的属性
def __init__(self, consumer_tag=None, delivery_tag=None, redelivered=False, exchange=None, routing_key=None):
self.consumer_tag = consumer_tag # 消费者标签,用来标记是哪一个消费者
self.delivery_tag = delivery_tag # 交付标签,用来发送消息确认和标记这是推送给该消费者的第几条消息
self.redelivered = redelivered # bool类型,若果为False表示这是消息第一次被推送,如果是True,表示这是一条被重复推送的消息
self.exchange = exchange # 该消息是从哪个交换机上来的
self.routing_key = routing_key # 该消息的路由键是什么
# 在回到函数中,可以通过
method.consumer_tag/method.delivery_tag/method.redelivered等获取相应的属性
- 开始阻塞等待消息:start_consuming
阻塞等待消息是有时间限制的,超过一定时间内如果没有新的消息可以推送会强制关闭连接,因此如果需要全时段等待的话需要监听该连接;
-
方式二:客户端主动获取消息
-
channel.basic_get:同步获取消息,性能比方式一低
def basic_get(self,
queue=None, # 队列名称
no_ack=False): # 是否需要开启确认模式
return method,properties,body
# 需要主动进行消息确认,basic_ack
取消订阅
-
取消某个消费者订阅:basic_cancel
-
取消所有的订阅:stop_consuming
订阅消息确认
- basic_ack
def basic_ack(self,
delivery_tag=0, # 消息的标记,int类型,一般将回调函数consumer_callback中获取的交付标记放到这个位置
multiple=False): # Flase表示确认单个消息,为True表示确认多个消息
- delivery_tag : 消息交付标记,每一条到消费者的消息,服务器会分配一个从1开始往上加的int数字,
订阅消息拒绝
- basic_nack
def basic_nack(self,
delivery_tag=None, # 交付这标记,和basic_ack一样
multiple=False, # Flase表示拒绝单个消息,为True表示拒绝多个消息
requeue=True) # True表示拒绝了消息后重新放回队列,False表示丢弃消息
- basic_reject:另一个方法,只能拒绝单个消息,没有multiple参数;
公平调度
- basic_qos:一般在信道声明的时候使用,确定该信道的预取数,提高性能
def basic_qos(self,
prefetch_size=0, # 设置预取数据包的大小
prefetch_count=0, # 设置预取的数量,如果为0则不预取,消费者处理越快,可以将这个这设置的越高
all_channels=False) # 是否将所有的信道都设置上述参数
投递消息确认机制
-
使用AMQP协议的事务方式
-
有三个方法tx_select,tx_commit,tx_rollback
# 开启一个事务,在提交事务之前必须先执行此方法
channel.tx_select
# 提交一个事务
channel.tx_select
# 捕捉到异常就使用回滚
channel.tx_rollback
- 举例
import pika
if __name__ == "__main__":
# 配置连接参数
parameters = pika.ConnectionParameters(host=self.__host)
# 创建一个连接对象
connection = pika.BlockingConnection(parameters)
# 创建一个信道
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='test',durable=True)
# 开启事务
channel.tx_select()
try:
channel.basic_publish(exchange='',
routing_key='test',
body='hello-world')
result = 1/0
channel.tx_commit()
except:
channel.tx_rollback()
# tx_select和tx_commit之间的所有的操作都是事务的一部分
-
以上的方式是十分消耗rabbitmq的性能的,一般不推荐使用;
-
confirm 模式
rabbbitmq自带confirm模式,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID)。
- 方法:channel.confirm_delivery
import pika
if __name__ == "__main__":
# 配置连接参数
parameters = pika.ConnectionParameters(host=HOST)
# 创建一个连接对象
connection = pika.BlockingConnection(parameters)
# 创建一个信道
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='test', durable=True)
# 打开通道的确认模式
channel.confirm_delivery()
for i in range(3):
result = channel.basic_publish(exchange='',
routing_key='test',
body='hello-world')
if result:
break
- 说明:
-
当确认模式没有打开时,即使队列和交换机不存在,投递消息返回的都是True;
-
当确认模式打开时,投递失败会返回False,成功返回True,如果队列不存在,交换机会叫消息丢掉,但不会通知生产者;如果交换机不存在,会报错;
-
同一个信道,确认模式和事务模式只能存在一个,不能同时启用,否则报错;
交换机相互绑定
- 方法:channel.exchange_bind
import pika
if __name__ == "__main__":
# 添加用户名和密码
credentials = pika.PlainCredentials(USERNAME, PASSWD)
# 配置连接参数
parameters = pika.ConnectionParameters(host=HOST, credentials=credentials)
# 创建一个连接对象
connection = pika.BlockingConnection(parameters)
# 创建一个信道
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='test1', durable=True)
channel.queue_declare(queue='test2', durable=True)
# 声明交换机
channel.exchange_declare('myname')
channel.exchange_declare('youname')
# 交换机绑定
channel.exchange_bind(destination='youname',source='myname',routing_key='ourheart')
# 队列绑定
channel.queue_bind(queue='test1',exchange='myname',routing_key='ourheart')
channel.queue_bind(queue='test2', exchange='youname',routing_key='ourheart')
channel.basic_publish(exchange='myname',
routing_key='ourheart',
body='hello-world')
- 说明:
-
交换机相互绑定后,如果他们之间连接的桥routing_key是相同的,向源交换机投递消息,数据可以到达相同路由键的所有的队列;向目的交换机投递消息,消息不能到达源交换机;
-
交换机设置了internal了True参数后,该交换机不能再接收到生产者发送的消息,但可以得到源交换机发送的消息;
其他方法
-
删除交换机:exchange_delete
-
删除队列:queue_delete
-
清除指定队列的所有的消息:queue_purge
-
队列和交换机解除绑定:queue_unbind
-
清除消费者:basic.cancel,指定消费者的consumer-tag