三、RabbitMQ - PHP 操作 RabbitMQ - (fanout)发布订阅模式
hi,我是温新,一名 PHPer
版本:erlang-25.2.1、rabbitmq_server-3.11.9
系统版本:Rocky Linux 9.1
学习目标:PHP 操作 RabbitMQ 之发布订阅模式
本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!
RabbitMQ 发布订阅, 分发一个消息给多个消费者(consumers)。可以对比着 Redis 中的发布订阅进行理解。不同的是 RabbitMQ 多了一个交换机。
本篇案例将演示日志系统。一个生产者生产消息,两个消费者,一个消费者输出消息,一个消费者将消息写扩日志。
交换机
在之前的学习中,我们是直接把消息发送到队列中,而现在使用的是完整的 RabbitMQ 消息模型。
简单模式与工作模式回顾:
1、生产者(producer)是发布消息的应用程序;
2、队列(queue)用于消息存储的缓冲;
3、消费者(consumer)是接收消息的应用程序。
RabbitMQ 消息模型的核心概念是:生产者不会直接发送消息给任何队列。实际上,生产者不知道消息是否已经投递到队列。
生产者(Producer)只需要消息发送给一个交换机(Exchange)。交换机从生产者那里接收消息,然后把消息推送到队列。交换机相当于是一个中间人,它负责把接收到的消息推送到指定的队列。
交换机有四种类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)。我们使用 fanout 来实现发布订阅。
消息生产者
// MqProducerController.php
// 发布订阅
public function fanoutMq()
{
$exchangeName = 'logs';
$data = date('Y-m-d H:i:s') . ' 王美丽来了';
// 1、声明交换机
$this->channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 2、把消息推送到交换机
$this->channel->basic_publish($msg, 'logs');
$this->channel->close();
$this->mqConnection->close();
}
生产者完成后可以调用看看,然后查看 RabbitMQ web 管理工具的 Exchange
,会发现多了一个 logs
的交换机。
订阅者
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 发布订阅模式-订阅者
class PublishMq extends Command
{
/**
* 使用参数来模拟多个消费者
*
* 不传参数,则把消息在屏幕中输出;
* 传递参数,则把消息写入日志文件
*/
protected $signature = 'mq:sub {num?}';
protected $description = 'Command description';
public function handle(): void
{
$num = $this->argument('num');
$connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare('logs', 'fanout', false, false, false);
// 获取队列名称并绑定交换机
list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queueName, 'logs');
$callback = function ($msg) use ($num) {
$body = $msg->body;
if (is_null($num)) {
// 屏幕中输出
echo $body . PHP_EOL;
} else {
Log::info($body);
echo '写入日志成功' . PHP_EOL;
}
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName,'', false,false,false,false,$callback);
while ($channel->is_open())
{
$channel->wait();
}
$channel->close();
$connection->close();
}
}
先运行起来,然后再对代码进行说明。运行消费者,如下:
# 消费者 1
$ php artisan mq:sub
2023-02-26 11:12:15 王美丽来了
# 消费者 2
$ php artisan mq:sub 1
写入日志成功
临时队列
之前的案例中在使用创建队列。从本案例中可以看到,已经没有创建队列。那么该怎么办?
首先,当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)
第二步,当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。
$channel->queue_declare("", false, false, true, false);
我们在声明队列的时候,队列名为空了,此时会生成一个包含随机的 RabbitMQ 队列名称。例如,类似 amq.gen-nX_MJuC_jPkY3zEc1nnQew。
再来回顾一下订阅的流程:
1、声明交换机;
2、声明队列(随机)
3、队列绑定交换机