在laravel框架中使用mq
本文写于2018-11-28
1、部署laravel项目
https://github.com/laravel/laravel 通过git克隆项目,或者下载zip包然后解压等方式都可以把laravel框架源码下载下来。
然后composer install 安装各种依赖
然后复制.env.example 为.env文件,执行php artisan key:generate 生成APP_KEY
2、上传文件到github【备注:这一步可以略过】
1)git bash 到项目目录,然后git init,初始化本地仓库
2)git remote add origin 远程库地址
3)git add . 把文件缓存到缓冲区
4)git commit -m \’初始提交\’
5)git push origin master 推送本地代码到远程库master分支
3、安装mq依赖
从 https://packagist.org/?query=rabbitmq 选择一个依赖包
从上面截图看到,能用的就是第一个和最后一个依赖。暂时先使用第一个依赖,后续有时间补充下第二个依赖的使用。
composer require php-amqplib/php-amqplib 即可安装依赖
4、laravel框架中使用mq
1)在config目录下新增mq.php,文件内容:
<?php return [ \'host\' => env(\'MQ_HOST\', \'127.0.0.1\'), \'port\' => env(\'MQ_PORT\', 5672), \'user\' => env(\'MQ_USER\', \'guest\'), \'password\' => env(\'MQ_PASSWORD\', \'guest\'), \'queue\' => env(\'MQ_QUEUE\', \'default\'), \'exchange\' => env(\'MQ_EXCHANGE\', \'default\'), \'key\' => env(\'MQ_KEY\', \'default\'), ];
具体的配置信息,可以在.env文件中配置,也可以修改这个文件。
2)新增MqSend.php文件,路径:app/Console/Commands
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class MqSend extends Command { /** * 控制台命令 signature 的名称。 * * @var string */ protected $signature = \'mq:send {msg}\'; /** * 控制台命令说明。 * * @var string */ protected $description = \'send messages to rabbitMQ\'; /** * 创建一个新的命令实例。 * * @return void */ public function __construct() { parent::__construct(); } /** * 执行控制台命令。 * * @return mixed */ public function handle() { $host = config(\'mq.host\'); $port = config(\'mq.port\'); $user = config(\'mq.user\'); $password = config(\'mq.password\'); $queue = config(\'mq.queue\'); // 队列名称 $exchange = config(\'mq.exchange\'); // 交换机名称 $key = config(\'mq.key\'); // 队列绑定交换机时配置的routingKey $connection = new AMQPConnection($host, $port, $user, $password); $channel = $connection->channel(); /** * 如果管理后台上已经配置了交换机、队列,以及绑定了关系,则不需要下面的3条语句 */ $channel->exchange_declare($exchange, \'direct\', false, true, false); // 初始化交换机 $channel->queue_declare($queue, false, true, false, false); // 初始化队列 $channel->queue_bind($queue, $exchange, $key); // 将队列与某个交换机进行绑定,并使用路由关键字 $msg = \'[\' . date(\'Y-m-d H:i:s\') . \'] \' . $this->argument(\'msg\'); $data = new AMQPMessage($msg, [\'delivery_mode\' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $channel->basic_publish($data, $exchange, $queue); echo "[X] Sent: $msg \n"; $channel->close(); $connection->close(); } }
3)新增MqReceive.php文件, 路径:app/Console/Commands
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class MqReceive extends Command { /** * 控制台命令 signature 的名称。 * * @var string */ protected $signature = \'mq:receive\'; /** * 控制台命令说明。 * * @var string */ protected $description = \'receive messages to rabbitMQ\'; /** * 创建一个新的命令实例。 * * @return void */ public function __construct() { parent::__construct(); } /** * 执行控制台命令。 * * @return mixed */ public function handle() { $host = config(\'mq.host\'); $port = config(\'mq.port\'); $user = config(\'mq.user\'); $password = config(\'mq.password\'); $queue = config(\'mq.queue\'); // 队列名称 $exchange = config(\'mq.exchange\'); // 交换机名称 $key = config(\'mq.key\'); // 队列绑定交换机时配置的routingKey $connection = new AMQPConnection($host, $port, $user, $password); $channel = $connection->channel(); /** * 如果管理后台上已经配置了交换机、队列,以及绑定了关系,则不需要下面的3条语句 */ $channel->queue_declare($queue, false, true, false, false); echo \' [*] Waiting for messages. To exit press CTRL+C\', "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume($queue, \'\', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } }
4)修改app/console/Kernel.php文件,在$commands数组中增加:
protected $commands = [ Commands\MqSend::class, Commands\MqReceive::class, ];
这样子就能在php artisan看到有mq的命令:
5)生产者发布消息
发布成功。代码中设置了交换机、队列名、理由关键词,这些默认值都是default,在rabbitMQ管理后台可以看到有新增了交换机、队列,队列里面也有消息。
【备注】windows系统上可能执行生产者脚本会报错:
这是因为windows不支持这个SOCKET_EAGAIN常量。
参考:https://github.com/php-amqplib/php-amqplib/issues/619
要改下vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php文件:
// self::$ERRNO_EQUALS_EAGAIN = \'errno=\' . SOCKET_EAGAIN; // windows不支持SOCKET_EAGAIN,所以会出现未定义的报错。在linux上SOCKET_EAGAIN是SOCKET_EWOULDBLOCK的别名 // https://github.com/php-amqplib/php-amqplib/issues/619 使用SOCKET_EWOULDBLOCK替换 self::$ERRNO_EQUALS_EAGAIN = \'errno=\' . (defined(\'SOCKET_EAGAIN\') ? SOCKET_EAGAIN : SOCKET_EWOULDBLOCK); self::$ERRNO_EQUALS_EWOULDBLOCK = \'errno=\' . SOCKET_EWOULDBLOCK; self::$ERRNO_EQUALS_EINTR = \'errno=\' . SOCKET_EINTR;
修改完之后就没问题了。
6)消费者接收消息
消息有2条,都接收到了。按Ctrl+C可以退出消费者,因为消费是阻塞,一直在等待接收消息。
5、参考文档:
1)laravel中新增artisan命令:https://laravel-china.org/docs/laravel/5.7/artisan/2276
2)laravel中使用 php-amqplib/php-amqplib 依赖包
https://segmentfault.com/a/1190000012308675
https://segmentfault.com/a/1190000011825148