三、RabbitMQ - PHP 操作 RabbitMQ - (fanout)发布订阅模式

作者: 温新

分类: 【PHP RabbitMQ】

阅读: 1242

时间: 2023-02-27 07:32:19

hi,我是温新,一名 PHPer

版本:erlang-25.2.1、rabbitmq_server-3.11.9

系统版本:Rocky Linux 9.1

学习目标:PHP 操作 RabbitMQ 之发布订阅模式

本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

RabbitMQ 发布订阅, 分发一个消息给多个消费者(consumers)。可以对比着 Redis 中的发布订阅进行理解。不同的是 RabbitMQ 多了一个交换机。

本篇案例将演示日志系统。一个生产者生产消息,两个消费者,一个消费者输出消息,一个消费者将消息写扩日志。

交换机

在之前的学习中,我们是直接把消息发送到队列中,而现在使用的是完整的 RabbitMQ 消息模型。

简单模式与工作模式回顾:

​ 1、生产者(producer)是发布消息的应用程序;

​ 2、队列(queue)用于消息存储的缓冲;

​ 3、消费者(consumer)是接收消息的应用程序。

RabbitMQ 消息模型的核心概念是:生产者不会直接发送消息给任何队列。实际上,生产者不知道消息是否已经投递到队列。

生产者(Producer)只需要消息发送给一个交换机(Exchange)。交换机从生产者那里接收消息,然后把消息推送到队列。交换机相当于是一个中间人,它负责把接收到的消息推送到指定的队列。

交换机有四种类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)。我们使用 fanout 来实现发布订阅。

消息生产者

// MqProducerController.php
// 发布订阅
public function fanoutMq()
{
    $exchangeName = 'logs';
    $data         = date('Y-m-d H:i:s') . ' 王美丽来了';

    // 1、声明交换机
    $this->channel->exchange_declare($exchangeName, 'fanout', false, false, false);
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    // 2、把消息推送到交换机
    $this->channel->basic_publish($msg, 'logs');

    $this->channel->close();
    $this->mqConnection->close();
}

生产者完成后可以调用看看,然后查看 RabbitMQ web 管理工具的 Exchange ,会发现多了一个 logs 的交换机。

订阅者

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 发布订阅模式-订阅者
class PublishMq extends Command
{
    /**
     * 使用参数来模拟多个消费者
     *
     * 不传参数,则把消息在屏幕中输出;
     * 传递参数,则把消息写入日志文件
     */
    protected $signature = 'mq:sub {num?}';

    protected $description = 'Command description';


    public function handle(): void
    {
        $num = $this->argument('num');

        $connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
        $channel = $connection->channel();

        // 声明交换机
        $channel->exchange_declare('logs', 'fanout', false, false, false);
        // 获取队列名称并绑定交换机
        list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
        $channel->queue_bind($queueName, 'logs');

        $callback = function ($msg) use ($num) {
            $body = $msg->body;
            if (is_null($num)) {
                // 屏幕中输出
                echo $body . PHP_EOL;
            } else {
                Log::info($body);
                echo '写入日志成功' . PHP_EOL;
            }
            $msg->ack();
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName,'', false,false,false,false,$callback);

        while ($channel->is_open())
        {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

先运行起来,然后再对代码进行说明。运行消费者,如下:

# 消费者 1
$ php artisan mq:sub
2023-02-26 11:12:15 王美丽来了

# 消费者 2
$ php artisan mq:sub 1
写入日志成功

临时队列

之前的案例中在使用创建队列。从本案例中可以看到,已经没有创建队列。那么该怎么办?

首先,当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)

第二步,当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。

$channel->queue_declare("", false, false, true, false);

我们在声明队列的时候,队列名为空了,此时会生成一个包含随机的 RabbitMQ 队列名称。例如,类似 amq.gen-nX_MJuC_jPkY3zEc1nnQew。

再来回顾一下订阅的流程:

1、声明交换机;

2、声明队列(随机)

3、队列绑定交换机

请登录后再评论