php扩展 rabbitmq
<?php // 建立TCP连接对象 $connection = new AMQPConnection([ 'host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' ]); $exchange_name="leo.direct";//交换机名称 $queue_name="leo_queue";//队列名称 $routing_key="leo_router"; $connection->connect();//链接rabbit 服务器 $channel = new AMQPChannel($connection);//创建通道 $channel->qos(0,2); $queue = new \AMQPQueue($channel);//创建消息队列 $queue->setName($queue_name);//设置消息队列的名称 $queue->setFlags(AMQP_DURABLE);//设置消息队列持久化(消息要想持久化,交换机和消息队列必须要持久化) $queue->declareQueue();//声明消息队列 $queue->bind($exchange_name, $routing_key);//绑定交换机在队列路由 // 消费队列消息 $queue->consume(function ($envelope,$queue){ $msg = $envelope->getBody(); var_dump("Received: " . $msg); $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答 });
二、消费者
<?php // 建立TCP连接对象 $connection = new AMQPConnection([ 'host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' ]); $exchange_name="leo.direct";//交换机名称 $queue_name="leo_queue";//队列名称 $routing_key="leo_router"; $connection->connect();//链接rabbit 服务器 $channel = new AMQPChannel($connection);//创建通道 $channel->qos(0,2); $exchange = new AMQPExchange($channel);//创建交换机 $exchange->setName($exchange_name);//设置交换机名称 $exchange->setType(AMQP_EX_TYPE_DIRECT);//设置交换机类型 $exchange->setFlags(AMQP_DURABLE);//设置交换机持久化 $exchange->declareExchange();//声明交换机 $queue = new \AMQPQueue($channel);//创建消息队列 $queue->setName($queue_name);//设置消息队列的名称 $queue->setFlags(AMQP_DURABLE);//设置消息队列持久化(消息要想持久化,交换机和消息队列必须要持久化) $queue->declareQueue();//声明消息队列 $queue->bind($exchange_name, $routing_key);//绑定交换机在队列路由 $msg=['msg'=>"hello world",'code'=>200,'datetime'=>date('Y-m-d H:i:s')];//发送的消息 $msgBody = is_array($msg)?json_encode($msg):$msg; $exchange->publish($msgBody,$routing_key,AMQP_NOPARAM,array('delivery_mode' => 2));