八、RabbitMQ - PHP 操作 MQ - 延迟插件实现延迟队列功能(全章完)
hi,我是温新,一名 PHPer
版本:erlang-25.2.1、rabbitmq_server-3.11.9
系统版本:Rocky Linux 9.1
学习目标:通过插件来实现延迟队列
本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!
本篇文章主要学习通过插件来实现延迟消息队列。
安装延迟插件
官方地址:https://www.rabbitmq.com/community-plugins.html
延迟插件名称:rabbitmq_delayed_message_exchange
安装插件
cd /usr/local/rabbitmq_server-3.11.9/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载完成后就可以了,什么都可以不动
启用延迟插件
#
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看插件
rabbitmq-plugins list | grep delayed
[E*] rabbitmq_delayed_message_exchange 3.11.1
通过延迟插件使用延时队列
添加交换机
在开始使用之前,还需要添加对应的交换机及交换机参数。当然了,可以使用命令行来添加,也可以使用 web 页面来添加。需要添加的信息如下:
name:dealy-exchange # 交换机名称(自定义)
Arguments:x-delayed-type:direct # 参数(指定)
查看信息:
# rabbitmqctl list_exchanges --vhost sms
Listing exchanges for vhost sms ...
name type
delay-exchange x-delayed-message
direct
实现延时队列
生产者
// 通过插件实现延迟队列
public function delayMq()
{
$data = 'hi,我是王美丽,我是通过延迟插件实现的消息延迟推送';
// 设置类型
$args = new AMQPTable([
'x-delayed-type' => 'direct'
]);
// 声明交换机时指定交换机类型
// x-delayed-message 类型由插件生成
$this->channel->exchange_declare('delay-exchange','x-delayed-message', false, true, false);
$this->channel->queue_declare('delay-log-queue',false, true,false,false,false,$args);
$this->channel->queue_bind('delay-log-queue', 'delay-exchange', 'delay-log-key');
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable([
// 设置延迟时间(单位:毫秒)
'x-delay' => 10000
])
]);
$this->channel->basic_publish($msg, 'delay-exchange', 'delay-log-key');
$this->channel->close();
$this->mqConnection->close();
}
消费者
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 插件延时队列
class DelayMq extends Command
{
protected $signature = 'mq:delay';
protected $description = 'Command description';
public function handle(): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
$channel = $connection->channel();
$channel->exchange_declare('delay-exchange','x-delayed-message', false,true,false);
$channel->queue_bind('delay-log-queue', 'delay-exchange', 'delay-log-key');
$callback = function ($msg) {
echo '输出: ' . $msg->body . PHP_EOL;
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('delay-log-queue','', false,false,false,false,$callback);
while ($channel->is_open())
{
$channel->wait();
}
$channel->close();
$connection->close();
}
}
查看输出结果:
$ php artisan mq:delay
输出: hi,我是王美丽,我是通过延迟插件实现的消息延迟推送
输出: hi,我是王美丽,我是通过延迟插件实现的消息延迟推送
输出: hi,我是王美丽,我是通过延迟插件实现的消息延迟推送
关于 PHP 快速使用 RabbitMQ 已经写完。其目的在于快速上手使用,因此,这仅仅只是一个开始,后续一起加油。
请登录后再评论