七、RabbitMQ - PHP 操作 RabbitMQ - PHP 死信队列操作

作者: 温新

分类: 【PHP RabbitMQ】

阅读: 1406

时间: 2023-02-27 07:40:18

hi,我是温新,一名 PHPer

版本:erlang-25.2.1、rabbitmq_server-3.11.9

系统版本:Rocky Linux 9.1

学习目标:了解死信队列的操作

本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

死信交换机

DLX(Dead Letter Exchange)死信交换机。当一个消息在队列中变成死信后,会被 publish 到死信交换机,然后再配置一个队列监听它,这个队列就是死信队列。

死信队列的配置需要两个参数:x-dead-letter-exchange'x-dead-letter-routing-key。下面开始演示死信队列的案例。

消费生产者

// 死信队列
public function deadMq()
{
    $data = '死信队列测试-消息被拒绝死信队列';

    /**
     * 正常消息队列
     */
    $this->channel->exchange_declare('my-logs', 'direct',false, false, false);
    $args = new AMQPTable([
        // 信息过期时间
        'x-message-ttl'             => 20000,
        'x-dead-letter-exchange'    => 'dead-exc',
        'x-dead-letter-routing-key' => 'dead-key'
    ]);
    // 通过队列额外参数设置过期时期等配置
    $this->channel->queue_declare('user-log-1', false, true,false,false,false,$args);
    $this->channel->queue_bind('user-log-1', 'my-logs', 'user');


    /**
     * 死信队列的配置
     */
    // 1、声明死信交换机
    $this->channel->exchange_declare('dead-exc','direct', false,false,false);
    // 2、声明死信队列
    $this->channel->queue_declare('dead-log-queue',false,true,false,false);
    // 3、死信队列与死信交换机绑定
    $this->channel->queue_bind('dead-log-queue', 'dead-exc','dead-key');

    // 正常队列发送消息
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    $this->channel->basic_publish($msg,'my-logs', 'user');

    $this->channel->close();
    $this->mqConnection->close();
}

上述代码声明两个与之相关的交换机与队列。一个是正常业务的消息队列,一个是死信队列。

1、正常消息队列业务逻辑中,通过额外参数配置过期时间等,让其成为死信队列;

2、通过 x-dead-letter-exchangex-dead-letter-routing-key 配置信息交换机名称与路由键;

3、配置死信队列,交换机是 x-dead-letter-exchange 设置的值;

4、死信队列绑定死信交换机并设置路由键。

消息者(正常业务)

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;

//正常消息者
class LogMq extends Command
{
    protected $signature = 'mq:log';

    protected $description = 'Command description';

    public function handle(): void
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
        $channel = $connection->channel();
        $channel->exchange_declare('my-logs','direct', false,false,false);

        list($queueName, ,) = $channel->queue_declare("", false, false, true, false);

        $channel->queue_bind($queueName, 'my-logs', 'user');

        $callback = function ($msg) {
            echo '输出: ' . $msg->body . PHP_EOL;
            $msg->ack();
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName,'', false,false,false,false,$callback);

        while ($channel->is_open())
        {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

这是一个正常业务的消息消费者。后续我们改动这个消费者来实现死信队列。

死信队列消费者

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class DeadMqLog extends Command
{
    protected $signature = 'mq:dead';

    protected $description = 'Command description';

    public function handle(): void
    {
        $connection =  new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'ziruchu');
        $channel = $connection->channel();
        $channel->exchange_declare('dead-exc','direct', false,false,false);


        $channel->queue_bind('dead-log-queue', 'dead-exc','dead-key');

        $callback = function ($msg) {
            echo '从死信队列中输出的消息: ' . $msg->body . PHP_EOL;
            $msg->ack();
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('dead-log-queue', '', false, false, false, false, $callback);
        while ($channel->is_open())
        {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

这是一个死信队列消费者,当有死信消费时就会走到这里。

死信队列演示

拒绝接收消息

1、修改 LogMq.php 正常消费者代码

// 未变化代码

$callback = function ($msg) {
    // 拒绝接收消息
	$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// 未变化代码

2、效果

# 生产者发布消息
$ curl http://la10test.test/mq/dead
# 正常消费者
$ php artisan mq:log

# 死信队列
$ php artisan mq:dead
从死信队列中输出的消息: 死信队列测试-消息被拒绝死信队列
从死信队列中输出的消息: 死信队列测试-消息被拒绝死信队列

消息过期

利用消息过期可以实现延迟队列的效果

1、修改 LogMq.php 正常消费者代码

$callback = function ($msg) {
    // 睡眠 23 秒,
    sleep(23);
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

2、效果

# 生产者发布消息
$ curl http://la10test.test/mq/dead
# 正常消费者
$ php artisan mq:log

# 死信队列
$ php artisan mq:dead
从死信队列中输出的消息: 死信队列测试-消息被拒绝死信队列
从死信队列中输出的消息: 死信队列测试-消息被拒绝死信队列
从死信队列中输出的消息: 死信队列测试-消息被拒绝死信队列
从死信队列中输出的消息: 死信队列测试-消息被拒绝死信队列

队列长度达到最大长度

1、修改生产者代码配置(deadMq 方法)

$args = new AMQPTable([
    'x-max-length'              => 5,
    'x-overflow'                => 'reject-publish-dlx',
    'x-dead-letter-exchange'    => 'dead-exc',
    'x-dead-letter-routing-key' => 'dead-key'
]);
请登录后再评论