一、RabbitMQ - PHP 操作 RabbitMQ - 简单模式
hi,我是温新,一名 PHPer
版本:erlang-25.2.1、rabbitmq_server-3.11.9
系统版本:Rocky Linux 9.1
学习目标:PHP 操作 RabbitMQ 之简单模式
本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!
该系列文章采用 Laravel 10
& "php-amqplib/php-amqplib": "^3.6.1"
学习演示。
简单模式中,需要编写两个程序,一个是消息生产者,一个是消费消费者。
简单模式生产者
<?php
namespace App\Http\Controllers;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MqProducerController extends Controller
{
private $mqConnection;
private $channel;
public function __construct()
{
// 1、建立连接
$this->mqConnection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
// 2、创建通道
$this->channel = $this->mqConnection->channel();
}
public function simpleMq()
{
// 3、创建队列
$this->channel->queue_declare('send_sms', false, true, false, false);
$data = 'hi, 我是王美丽';
// 4、发送消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$this->channel->basic_publish($msg, '', 'send_sms');
// 5、关闭通道
$this->channel->close();
// 关闭连接
$this->mqConnection->close();
}
}
下面对代码进行解释:
第一步:建立连接
#
$this->mqConnection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
连接抽象的套接字连接,负责协议版本和身份验证等等。这里我们在本地机器上连接到一个代理 - 指定为localhost。如果我们想连接到另一台计算机上一个代理我们只需简单地指定其名称或IP地址。
第二步:创建通道
大部分的 API 都在通道中完成。
$this->channel = $this->mqConnection->channel();
第三步:创建消息队列
通道创建完成后,就可以发送消息,发送消息需要基于一个队列,因此通过队列来发送消息,并指定队列名称。
$this->channel->queue_declare('send_sms', false, true, false, false);
参数说明:
参数 | 默认值 | 说明 |
---|---|---|
$queue | '' | 队列名称(唯一) |
$passive | false | **被动模式。**为 true 时,如时 $queue 不存在,则返回错误(不创建队列,只检测队列是否存在);为 false 时,若 $queue 不存在,则创建此队列,然后返回 OK |
$durable | false | **队列持久化。**为 true 时,消费将存入数据库,即使服务崩溃,消息也不会消失 |
$exclusive | false | 排他性。为 true 时,只能在本次连接中使用,连接关闭时自动消亡(即使 $durable 为 true) |
$auto_delete | true | 自动消亡。为 true 时,当队列不再有订阅者时,会自动消亡 |
$nowait | false | 异步执行。为 true 时,不等待队列创建结果,立即完成函数调用 |
$arguments | array | 设置消息队列的额外参数,如存储时间等 |
$ticket | null | 传 0 或 null |
第四步:发送消息
发送消息,并将消息持久化
// 处理消息并将消息持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 将消息推送到队列中
$this->channel->basic_publish($msg, '', 'send_sms');
第五步:关闭连接
$this->channel->close();
$this->mqConnection->close();
第六步:执行
代码完成后,可以通过浏览器访问到这个方法,生产一条消息。
然后打开 RabbitMQ WEB 工具,就可以看到消息了。
消息消费者
关于消费者,连接的参数是一样的,不同是使用的方法。
消费者这里将使用 artisan command
的形式来使用,因此我创建一个自定义命令。
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class SimpleMqClient extends Command
{
protected $signature = 'mq:test-simple-client';
protected $description = 'Command description';
public function handle(): void
{
// 1、创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
// 2、创建通道
$channel = $connection->channel();
// 3、创建队列
$channel->queue_declare('send_sms', false, true, false, false);
$callback = function ($msg) {
// 输出消费的消息
// 后续逻辑处理
echo $msg->body . PHP_EOL;
};
// 4、消费者使用消息
$channel->basic_consume('send_sms', '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
// 5、关闭连接
$channel->close();
$connection->close();
}
}
消费者使用消息参数说明
$channel->basic_consume('send_sms', '', false, true, false, false, $callback);
参数说明
参数 | 默认值 | 说明 |
---|---|---|
$queue | 队列名称 | |
$consumer_tag | **消费者标签。**用于区分多个消费者 | |
$no_local | false | AMQP 标准,但 RabbitMQ 没有实现 |
$no_ack | false | **收到消息后,是否不需要回复确认即被认为被消费。**为 true 时,表示自动应答;为 false 时,表示手动应答 |
$exclusive | false | 设置是否排他。排他消费者,即这个队列只能由一个消费者消费,适用于任务不允许进行并发处理的情况 |
$nowait | false | 为 true 时,表示不等待服务器回执信息,函数将返回 NULL,若开启了排序,则必须等待结果 |
$callback | null | 回调函数 |
$arguments | array | 额外配置参数 |
消费者执行
php artisan mq:test-simple-client
在命令行中执行该命令,会一直处于阻塞状态,有生产者生产的消费就会被读出。
$ php artisan mq:test-simple-client
hi, 我是王美丽
hi, 我是王美丽
hi, 我是王美丽
请登录后再评论