二、RabbitMQ - PHP 操作 RabbitMQ - 工作模式
hi,我是温新,一名 PHPer
版本:erlang-25.2.1、rabbitmq_server-3.11.9
系统版本:Rocky Linux 9.1
学习目标:PHP 操作 RabbitMQ 之工作模式
本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!
该系列文章采用 Laravel 10
& "php-amqplib/php-amqplib": "^3.5"
学习演示。
在这篇教程中,我们将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Worker)。
工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
工作模式-系统自动确认
消息生产者
// MqProducerController.php
// 工作模式
public function workerMp()
{
$this->channel->queue_declare('send_sms', false, true, false, false);
for ($i = 0; $i < 10; $i++) {
// 注意,引入了 Str 门面
$data = $i . ' hi, 我是王美丽 ' . Str::random();
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$this->channel->basic_publish($msg, '', 'worker_task');
}
$this->channel->close();
$this->mqConnection->close();
}
不要疑惑,生产消息者就是没有发生变化。
消息消费者
<?php
// App\Console\Commands\WorkerMq.php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 工作模式,消息消费者
class WorkerMq extends Command
{
protected $signature = 'mq:worker-client';
protected $description = 'Command description';
public function handle(): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
$channel = $connection->channel();
$channel->queue_declare('worker_task', false, true, false, false);
$callback = function ($msg) {
sleep(1);
// 输出消费的消息
echo $msg->body . PHP_EOL;
};
# 自动确认回复
$channel->basic_consume('worker_task', '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
不要疑惑,消费者同样没有发生变化。下面来演示多个消费的情况。多开几个 DOM 窗口,执行如下命令:
# 命令行执行
php artisan mq:worker-client
执行结果
# 窗口1
$ php artisan mq:worker-client
0 hi, 我是王美丽 rlPdYsygYapGP65e
2 hi, 我是王美丽 5ORjr9dq4orhjIgq
4 hi, 我是王美丽 P2ANiGvXZo8kgoaY
6 hi, 我是王美丽 vNKXeqo4mYOuRrXt
8 hi, 我是王美丽 ncdlldEH5R4tQX4f
# 窗口2
$ php artisan mq:worker-client
1 hi, 我是王美丽 jSGhGqXwJOhH3HsB
3 hi, 我是王美丽 7FdmlO7EfzlfmSSt
5 hi, 我是王美丽 nT2zTAjYxyWHYkBJ
7 hi, 我是王美丽 XnhaYEkZheb3xI0l
9 hi, 我是王美丽 f5X8EQ4ElOlZlf94
乍一看,和简单模式没有什么区别,就是多开了几个窗口。
现在就要说说问题了,我们似乎什么也没有做,那是因为我们在消费中使用了自动确认,第三个参数为 true,使用自动确认。接收下的案例将使用手动确认。
# 自动确认回复
$channel->basic_consume('worker_task', '', false, true, false, false, $callback);
工作模式- ack 手动确认
修改为手动确认,不需要要有太大的改动,因此列出片段代码。
// App\Console\Commands\WorkerMq.php
public function handle(): void
{
// 未改动,省略
$callback = function ($msg) {
sleep(1);
// 输出消费的消息
echo $msg->body . PHP_EOL;
// 第三步:手动确认消息
$msg->ack();
};
// 第一步:修改为手动确认
$channel->basic_consume('worker_task', '', false, false, false, false, $callback);
// 第二步:收到确认后再处理下一条
$channel->basic_qos(null, 1, null);
// 未改动,省略
}
这样修改完成后就变成了手动确认了。但是中间的状态需要有所了解,因此,可以按照如下步骤进行观察:
第一步:修改为手动确认 后,走一遍生产者与消费者的流程,然后查看 web 管理页面中队列的状态,会显示为 Unacked
;
第二步:basic_qos(null, 1, null);
只有收到回复后再处理一条消息。这里可以再走一遍生产与消费流程,会发现只一条消息被打印出来;
第三步:确认消息,$msg->ack();
,如此,手动确认完成。
关于调度
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。
通过案例输出结果,可以发现挺有规律的。默认情况下,RabbitMQ 会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。
关于消息 ack 确认
当消费被 RabbitMQ 发送给消费者后,就会从内存中移除。因此,要是消费者突然挂了,消息也就消失了。
我们当然不希望消费被丢失,当一个 worker 挂了之后,希望任务重新发送给其他工作者。为了防止消息丢失,RabbitMQ 提供了消息响应(acknowledgments)。消费者通过一个 ack(响应),告诉 RabbitMQ 已收到并处理了该条消息,然后 RabbitMQ 就会释放并删除该消息。
若消费者突然挂了,没有发送 ack 确认,RabbitMQ 会认为消息没有被完全处理,然后就会重新发送给其他消费者。
消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
消息持久化
如果不设置持久化,一旦崩溃,将丢失所有的队列和消息。为了防止消息丢失,有两个事情需要注意:必须持久化队列
和消息
。
持久化设置:设置第三个参数为 true
#
$channel->queue_declare('sms', false, true, false, false);
公平调度
#
$channel->basic_qos(null, 1, null);
basic_qos
告诉 RabbitMQ,同一时刻,不要发送超过 1 条消息给一个 worker,知道它已经处理完上一条消息并做了 ack 确认。