12、Workerman 基本使用 - 分布式通讯组件
12、Workerman 基本使用 - 分布式通讯组件
hi,我是温新,一名 PHPer
Channel 是一个分布式通讯组件,用于完成进程间通讯或者服务器间通讯。
关于 Channel 原理及注意事项什么的,请阅读文档,懒得复制了。
channel 基础使用
1、安装组件
$ composer require workerman/channel --ignore-platform-req=ext-pthreads
2、channel 使用
<?php
/**
* channel-base.php
*
* 分布式通讯组件 - 基础使用
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Channel\Server;
use Channel\Client;
require_once __DIR__ . '/vendor/autoload.php';
// 创建并启动 Channel 服务器
$channelServer = new Server('0.0.0.0', 2206);
// 创建一个 WebSocket Worker 服务,用于接收客户端连接
$worker = new Worker('websocket://0.0.0.0:8888');
$worker->name = 'websocket';
$worker->count = 6;
// 当 Worker 启动时执行此回调函数
$worker->onWorkerStart = function (Worker $worker) {
// 连接到运行在 127.0.0.1:2206 的 Channel 服务器
Client::connect('127.0.0.1', 2206);
// 监听名为 order-broadcast 的频道事件
Client::on('order-broadcast', function ($event) use ($worker) {
// 遍历当前 Worker 进程中所有已建立连接的客户端,并向它们发送广播消息
foreach ($worker->connections as $connection) {
$connection->send($event);
}
});
};
// 当接收到客户端通过 WebSocket 发送的消息时执行此回调函数
$worker->onMessage = function (TcpConnection $connection, $data) {
// 将客户端发送的消息发布到名为 order-broadcast 的频道中
Client::publish('order-broadcast', $data);
};
Worker::runAll();
3、测试
启动服务端程序
$ php channel-base.php start
客户端测试
浏览器中打开 F12,控制台中输入如下代码
ws = new WebSocket("ws://127.0.0.1:8888");
ws.onopen = function () {
ws.send('hello 广播')
}
ws.onmessage = function(e) {
console.log("收到服务端的消息:" + e.data);
};
可以多打开几个终端进行测试。
channel 集群推送
基于 Worker 的多进程(分布式集群)推送系统,集群群发、集群广播。
1、channel 服务
整个系统只能部署一个 channel 服务。假设运行在 192.168.1.1。
<?php
/**
* start-channel.php
*
* 集群推送服务端
*/
use Workerman\Worker;
use Channel\Server;
require_once __DIR__ . '/vendor/autoload.php';
$channelServer = new Server('0.0.0.0', 2206);
Worker::runAll();
2、websocket 服务
整个系统可以部署多个 ws 服务,假设运行在 192.168.1.2 和 192.168.1.3 两台服务器上。
<?php
/**
* start-ws.php
*
* websocket 服务
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Channel\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('websocket://0.0.0.0:4236');
$worker->count = 2;
$worker->name = 'pusher';
$worker->onWorkerStart = function (Worker $worker) {
// Channel 客户端连接到 Channel 服务端
Client::connect('127.0.0.1', 2206);
// 以自己的进程 id 为事件名称
$eventName = $worker->id;
// 订阅名为当前 Worker 进程 ID 的事件,并注册事件处理函数
// 当接收到带有指定目标连接 ID 的消息时,向该连接发送消息
Client::on($eventName, function ($eventData) use ($worker) {
$toConnectionId = $eventData['to_connection_id'];
$message = $eventData['content'];
if (!isset($worker->connections[$toConnectionId])) {
echo 'connection not exists' . PHP_EOL;
return;
}
$toConnection = $worker->connections[$toConnectionId];
$toConnection->send($message);
});
$eventName = '广播';
// 当接收到名为 “广播” 的事件时,向当前 Worker 进程内所有已连接的客户端发送广播消息
Client::on($eventName, function ($eventData) use ($worker) {
$message = $eventData['content'];
foreach ($worker->connections as $connection) {
$connection->send($message);
}
});
};
// 当新的 WebSocket 连接建立时执行此回调函数
$worker->onConnect = function (TcpConnection $connection) use ($worker) {
// 输出连接信息并发送给客户端
$message = 'workerId:' . $worker->id . 'connectionId' . $connection->id . 'contented' . PHP_EOL;
echo $message;
$connection->send($message);
};
Worker::runAll();
3、http 服务
<?php
/**
* start-http.php
*
* http 服务
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Channel\Client;
use Workerman\Protocols\Http\Request;
require_once __DIR__ . '/vendor/autoload.php';
$httpWorker = new Worker('http://0.0.0.0:4327');
$httpWorker->name = 'publisher';
$httpWorker->onWorkerStart = function () {
// HTTP 服务启动时连接到 Channel 频道
Client::connect('127.0.0.1', 2206);
};
$httpWorker->onMessage = function (TcpConnection $connection, Request $request) {
// 兼容Workerman 4.x版本,将请求信息存入 $_GET 全局变量中
if (! is_array($request)) {
$_GET = $request->get();
}
// 向客户端返回"HTTP OK"响应
$connection->send('http ok');
// 如果请求中没有 content 参数,则直接返回
if (empty($_GET['content'])) {
return;
}
// 判断是否指定了目标 Worker 进程 ID 和连接 ID
if (isset($_GET['to_worker_id']) && isset($_GET['to_connection_id'])) {
$eventName = $_GET['to_worker_id'];
$toConnectionId = $_GET['to_connection_id'];
$content = $_GET['content'];
// 发布一条包含目标连接 ID 和内容的消息到指定事件(目标 Worker 进程 ID)下
Client::publish($eventName, [
'to_connection_id' => $toConnectionId,
'content' => $content
]);
} else {
// 若未指定目标 Worker 进程 ID 和连接 ID,则使用'广播'作为事件名
$eventName = '广播';
$content = $_GET['content'];
// 发布一条包含内容的消息到'广播'事件下,进行广播通知
Client::publish($eventName, [
'content'=>$content,
]);
}
};
Worker::runAll();
4、测试
1、启动服务
$ php start-channel.php start
$ php start-ws.php start
$ php start-http.php start
2、websocket 客户端连接
打开浏览器,F12 打开调试控制台,并在 Console 中输入如下代码:
ws = new WebSocket("ws://127.0.0.1:4236");
ws.onmessage = function(e) {
console.log("收到服务端的消息:" + e.data);
};
此时,可以看到控制台中输出的信息:收到服务端的消息:workerId:0connectionId2contented
。
而 start-ws.php
服务端输出如下:
$ php start-ws.php start
...
workerId:0connectionId2contented
3、通过调用 http 接口推送
3.1、在浏览器中访问如下地址。我在终端中进行测试。
# 该 IP 是我本机 IP
$ curl http://192.168.10.58:4327/?content=PHP-and-Laravel
http ok
# 访问之后,查看 websocket 客户端,会看到如下信息:
收到服务端的消息:PHP-and-Laravel # websocket 客户端
3.2、测试 worker_id & connection_id
浏览器中访问 http://192.168.10.58:4327/?to_worker_id=0&to_connection_id=2&content=王美丽
,然后查看 websocket 客户端输入的内容:收到服务端的消息:王美丽
。
分组推送
<?php
/**
* group.php
*
* 使用 channel 和 websocket 实现分组推送
*
* 工作原理:
* 通过 WebSocket 与客户端保持连接,并使用 Channel 进行消息广播,
* 当接收到客户端发送的群组操作命令时,根据命令类型进行相应处理(加入群组或向群组发送消息)
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建 channel 服务
$channelServer = new Channel\Server('0.0.0.0', 2206);
// 创建 websocket 服务
$worker = new Worker('websocket://0.0.0.0:1234');
$worker->count = 8;
// 全局群组到连接的映射数组
$groupConMap = [];
$worker->onWorkerStart = function() use (&$groupConMap) {
// 连接到本地 Channel 服务器
Channel\Client::connect('127.0.0.1', 2206);
// 监听全局分组发送消息事件
Channel\Client::on('send_to_group', function($event_data) use (&$groupConMap) {
$groupId = $event_data['group_id'];
$message = $event_data['message'];
print_r(array_keys($groupConMap));
// 如果存在对应群组的连接集合,则遍历并发送消息
if (isset($groupConMap[$groupId])) {
foreach ($groupConMap[$groupId] as $con) {
$con->send($message);
}
}
});
};
// 当 WebSocket 客户端发送消息时触发此回调函数
$worker->onMessage = function(TcpConnection $con, $data) use (&$groupConMap) {
// 加入群组消息{"cmd":"add_group", "group_id":"123"}
// 或者 群发消息{"cmd":"send_to_group", "group_id":"123", "message":"这个是消息"}
$data = json_decode($data, true);
print_r($data);
$cmd = $data['cmd'];
$groupId = $data['group_id'];
switch($cmd) {
case "add_group":
// 将连接加入到对应的群组数组里
$groupConMap[$groupId][$con->id] = $con;
// 记录这个连接加入了哪些群组,方便在 onclose 的时候清理 $groupConMap 对应群组的数据
$con->group_id = $con->group_id ?? [];
$con->group_id[$groupId] = $groupId;
break;
// 群发消息给群组
case "send_to_group":
// 发布一个"send_to_group"事件,携带群组 ID 和要发送的消息
Channel\Client::publish('send_to_group', array(
'group_id' => $groupId,
'message' => $data['message']
));
break;
}
};
// 这里很重要,连接关闭时把连接从全局群组数据中删除,避免内存泄漏
$worker->onClose = function(TcpConnection $con) use (&$groupConMap) {
// 如果连接记录了所属群组信息
if (isset($con->group_id)) {
// 遍历并从每个群组中移除该连接
foreach ($con->group_id as $groupId) {
unset($groupConMap[$groupId][$con->id]);
if (empty($groupConMap[$groupId])) {
unset($groupConMap[$groupId]);
}
}
}
};
Worker::runAll();
服务端启动后,使用留浏览器充当客户端进行测试,打开 F12,在控制台中输入如下代码:
ws = new WebSocket('ws://127.0.0.1:1234');
ws.onmessage = function(data){console.log(data.data)};
ws.onopen = function() {
ws.send('{"cmd":"add_group", "group_id":"123"}');
ws.send('{"cmd":"send_to_group", "group_id":"123", "message":"这个是消息"}');
};
请登录后再评论