四、RabbitMQ - PHP 操作 RabbitMQ - (direct)路由模式

作者: 温新

分类: 【PHP RabbitMQ】

阅读: 1537

时间: 2023-02-27 07:34:02

hi,我是温新,一名 PHPer

版本:erlang-25.2.1、rabbitmq_server-3.11.9

系统版本:Rocky Linux 9.1

学习目标:PHP 操作 RabbitMQ 之路由模式

本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

路由模式重点在于 Routing Key,我们可以根据这个要实现不同的功能。本篇文章以错误日志为案例进行演示。errorinfo是不同的 Routing Key,它们对应不同的日志,只有绑定到对应 key 的交换机,才能看到对应自己绑定 key 的消息。

绑定 Routing Key

$channel->queue_bind($queue_name, 'logs');

上篇文章,我们使用 queue_bind 绑定交换机和队列的关系。此外它还有第三个参数,就是 routing_key,如下:

$channel->queue_bind($queueName, 'my-logs', 'info');

发送消息

将消息发送到直连交换机,把日志级别作为路由键。这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。

$this->channel->basic_publish($msg,'my-logs', 'info');

介绍了这些,来看看整体案例

生产者(发布者)

// 路由模式
public function directMq()
{
    // 日志模式: info,error
    $logMode = request()->input('mode') ?? 'info';

    $this->channel->exchange_declare('my-logs', 'direct',false, false, false);
    $msg = new AMQPMessage($logMode, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    // 指定 Routing Key
    $this->channel->basic_publish($msg,'my-logs', $logMode);

    $this->channel->close();
    $this->mqConnection->close();
}

消费者(订阅者)

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 路由模式
class DirectMq extends Command
{
    protected $signature = 'mq:direct {mode?}';


    protected $description = 'Command description';


    public function handle(): void
    {
        // 日志模式:info、error
        $logMode = $this->argument('mode') ?? 'info';

        $connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
        $channel = $connection->channel();
        $channel->exchange_declare('my-logs','direct', false,false,false);

        list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
        $channel->queue_bind($queueName, 'my-logs', $logMode);

        $callback = function ($msg) {
            echo '输出: ' . $msg->body . PHP_EOL;
            $msg->ack();
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName,'', false,false,false,false,$callback);

        while ($channel->is_open())
        {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

执行结果:

发布者

http://la10test.test/mq/direct?mode=error
http://la10test.test/mq/direct

订阅者

$ php artisan mq:direct
输出: info
输出: info
输出: info
    
$ php artisan mq:direct error
输出: error
输出: error
输出: error
请登录后再评论