<?php
// 建立TCP连接对象
$connection = new AMQPConnection([
    'host' => '127.0.0.1',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
]);
$exchange_name="leo.direct";//交换机名称
$queue_name="leo_queue";//队列名称
$routing_key="leo_router";
$connection->connect();//链接rabbit 服务器
$channel = new AMQPChannel($connection);//创建通道
$channel->qos(0,2);
$queue = new \AMQPQueue($channel);//创建消息队列
$queue->setName($queue_name);//设置消息队列的名称
$queue->setFlags(AMQP_DURABLE);//设置消息队列持久化(消息要想持久化,交换机和消息队列必须要持久化)
$queue->declareQueue();//声明消息队列
$queue->bind($exchange_name, $routing_key);//绑定交换机在队列路由
// 消费队列消息
$queue->consume(function ($envelope,$queue){
        $msg = $envelope->getBody();
        var_dump("Received: " . $msg);
        $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
    });

  二、消费者

<?php
// 建立TCP连接对象
$connection = new AMQPConnection([
    'host' => '127.0.0.1',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
]);
$exchange_name="leo.direct";//交换机名称
$queue_name="leo_queue";//队列名称
$routing_key="leo_router";
$connection->connect();//链接rabbit 服务器
$channel = new AMQPChannel($connection);//创建通道
$channel->qos(0,2);
$exchange = new AMQPExchange($channel);//创建交换机
$exchange->setName($exchange_name);//设置交换机名称
$exchange->setType(AMQP_EX_TYPE_DIRECT);//设置交换机类型
$exchange->setFlags(AMQP_DURABLE);//设置交换机持久化
$exchange->declareExchange();//声明交换机
$queue = new \AMQPQueue($channel);//创建消息队列
$queue->setName($queue_name);//设置消息队列的名称
$queue->setFlags(AMQP_DURABLE);//设置消息队列持久化(消息要想持久化,交换机和消息队列必须要持久化)
$queue->declareQueue();//声明消息队列
$queue->bind($exchange_name, $routing_key);//绑定交换机在队列路由
$msg=['msg'=>"hello world",'code'=>200,'datetime'=>date('Y-m-d H:i:s')];//发送的消息
$msgBody = is_array($msg)?json_encode($msg):$msg;
$exchange->publish($msgBody,$routing_key,AMQP_NOPARAM,array('delivery_mode' => 2));