八、RabbitMQ - PHP 操作 MQ - 延迟插件实现延迟队列功能(全章完)

作者: 温新

分类: 【PHP RabbitMQ】

阅读: 2071

时间: 2023-02-27 07:44:50

hi,我是温新,一名 PHPer

版本:erlang-25.2.1、rabbitmq_server-3.11.9

系统版本:Rocky Linux 9.1

学习目标:通过插件来实现延迟队列

本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

本篇文章主要学习通过插件来实现延迟消息队列。

安装延迟插件

官方地址:https://www.rabbitmq.com/community-plugins.html

延迟插件名称:rabbitmq_delayed_message_exchange

安装插件

cd /usr/local/rabbitmq_server-3.11.9/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez

下载完成后就可以了,什么都可以不动

启用延迟插件

#
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看插件

rabbitmq-plugins list | grep delayed
[E*] rabbitmq_delayed_message_exchange 3.11.1

通过延迟插件使用延时队列

添加交换机

在开始使用之前,还需要添加对应的交换机及交换机参数。当然了,可以使用命令行来添加,也可以使用 web 页面来添加。需要添加的信息如下:

name:dealy-exchange	# 交换机名称(自定义)
Arguments:x-delayed-type:direct # 参数(指定)

查看信息:

# rabbitmqctl list_exchanges --vhost sms
Listing exchanges for vhost sms ...
name    type
delay-exchange  x-delayed-message
        direct

实现延时队列

生产者

// 通过插件实现延迟队列
public function delayMq()
{
    $data = 'hi,我是王美丽,我是通过延迟插件实现的消息延迟推送';
    // 设置类型
    $args = new AMQPTable([
        'x-delayed-type' => 'direct'
    ]);
    // 声明交换机时指定交换机类型
    // x-delayed-message 类型由插件生成
    $this->channel->exchange_declare('delay-exchange','x-delayed-message', false, true, false);
    $this->channel->queue_declare('delay-log-queue',false, true,false,false,false,$args);
    $this->channel->queue_bind('delay-log-queue', 'delay-exchange', 'delay-log-key');

    $msg = new AMQPMessage($data, [
        'delivery_mode'      => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        'application_headers' => new AMQPTable([
            // 设置延迟时间(单位:毫秒)
            'x-delay' => 10000
        ])
    ]);
    $this->channel->basic_publish($msg, 'delay-exchange', 'delay-log-key');
    $this->channel->close();
    $this->mqConnection->close();
}

消费者

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 插件延时队列
class DelayMq extends Command
{
    protected $signature = 'mq:delay';

    protected $description = 'Command description';

    public function handle(): void
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
        $channel = $connection->channel();
        $channel->exchange_declare('delay-exchange','x-delayed-message', false,true,false);

        $channel->queue_bind('delay-log-queue', 'delay-exchange', 'delay-log-key');

        $callback = function ($msg) {
            echo '输出: ' . $msg->body . PHP_EOL;
            $msg->ack();
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('delay-log-queue','', false,false,false,false,$callback);

        while ($channel->is_open())
        {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

查看输出结果:

$ php artisan mq:delay
输出: hi,我是王美丽,我是通过延迟插件实现的消息延迟推送
输出: hi,我是王美丽,我是通过延迟插件实现的消息延迟推送
输出: hi,我是王美丽,我是通过延迟插件实现的消息延迟推送

关于 PHP 快速使用 RabbitMQ 已经写完。其目的在于快速上手使用,因此,这仅仅只是一个开始,后续一起加油。

请登录后再评论