五、RabbitMQ - PHP 操作 RabbitMQ - (topic)主题模式

作者: 温新

分类: 【PHP RabbitMQ】

阅读: 1539

时间: 2023-02-27 07:36:56

hi,我是温新,一名 PHPer

版本:erlang-25.2.1、rabbitmq_server-3.11.9

系统版本:Rocky Linux 9.1

学习目标:PHP 操作 RabbitMQ 之主题模式

本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

主题交换机

发送到主题交换机的路由键必须是一个以 . 分隔开的词语列表。如users.iduser.age

绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:

* (星号) 用来表示一个单词.

# (井号) 用来表示任意数量(零个或多个)单词。

主题交换机

主题交换机是很强大的,它可以表现出跟其他交换机类似的行为

当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。

* (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

生产者

// 主题模式
public function topicMq()
{
    // 固定.格式,用户后续消费者匹配使用
    // user.info、user.error
    $routingKey = request()->input('mode') ?? 'user.info';
    // 主题模式
    $this->channel->exchange_declare('topic-logs', 'topic',false, false, false);
    $msg = new AMQPMessage($routingKey, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    $this->channel->basic_publish($msg, 'topic-logs', $routingKey);
    $this->channel->close();
    $this->mqConnection->close();
}

消费者

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class TopicMq extends Command
{

    protected $signature = 'mq:topic {mode?}';

    protected $description = 'Command description';


    public function handle(): void
    {
        // 接收参数参数用于匹配
        $logMode = $this->argument('mode') ?? '#';

        $connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
        $channel = $connection->channel();
        $channel->exchange_declare('topic-logs','topic', false,false,false);

        list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
        $channel->queue_bind($queueName, 'topic-logs', $logMode);

        $callback = function ($msg) {
            echo '输出: ' . $msg->body . PHP_EOL;
            $msg->ack();
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName,'', false,false,false,false,$callback);

        while ($channel->is_open())
        {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

使用

生产者

创建三个不同等级的日志模式

curl http://la10test.test/mq/topic?mode=user.error
curl http://la10test.test/mq/topic?mode=user.info
curl http://la10test.test/mq/topic?mode=user.warn

消费者

1、使用 # 匹配,# 会接收所有日志

# 消费者
$ php artisan mq:topic "#"
    
# 生产者
curl http://la10test.test/mq/topic?mode=user.error
curl http://la10test.test/mq/topic?mode=user.info
curl http://la10test.test/mq/topic?mode=user.warn

# 消费者输出结果
$ php artisan mq:topic "#"
输出: user.error
输出: user.info
输出: user.warn

2、使用 *.error ,只匹配 后缀为 error 的消息

# 消费者
$ php artisan mq:topic "*.error"
    
# 生产者
$ curl http://la10test.test/mq/topic?mode=user.info
$ curl http://la10test.test/mq/topic?mode=user.error

# 消费者输出结果
$ php artisan mq:topic "*.error"
输出: user.error

3、使用 devices.* ,只接收 devices 开头的队列消息

# 消费者
$ php artisan mq:topic "devices.*"
    
# 生产者
$ curl http://la10test.test/mq/topic?mode=devices.info
$ curl http://la10test.test/mq/topic?mode=user.error

# 消费者输出结果
$ php artisan mq:topic "devices.*"
输出: devices.info
请登录后再评论