12、Workerman 基本使用 - 分布式通讯组件

作者: 温新

图书: 【Workerman 基本使用】

阅读: 141

时间: 2024-04-23 14:06:55

12、Workerman 基本使用 - 分布式通讯组件

hi,我是温新,一名 PHPer

Channel 是一个分布式通讯组件,用于完成进程间通讯或者服务器间通讯。

关于 Channel 原理及注意事项什么的,请阅读文档,懒得复制了。

channel 基础使用

1、安装组件

$ composer require workerman/channel --ignore-platform-req=ext-pthreads

2、channel 使用

<?php
/**
 * channel-base.php
 *
 * 分布式通讯组件 - 基础使用
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Channel\Server;
use Channel\Client;

require_once __DIR__ . '/vendor/autoload.php';

// 创建并启动 Channel 服务器
$channelServer = new Server('0.0.0.0', 2206);

// 创建一个 WebSocket Worker 服务,用于接收客户端连接
$worker = new Worker('websocket://0.0.0.0:8888');

$worker->name  = 'websocket';
$worker->count = 6;

// 当 Worker 启动时执行此回调函数
$worker->onWorkerStart = function (Worker $worker) {
    // 连接到运行在 127.0.0.1:2206 的 Channel 服务器
    Client::connect('127.0.0.1', 2206);
    // 监听名为 order-broadcast 的频道事件
    Client::on('order-broadcast', function ($event) use ($worker) {
        // 遍历当前 Worker 进程中所有已建立连接的客户端,并向它们发送广播消息
        foreach ($worker->connections as $connection) {
            $connection->send($event);
        }
    });
};

// 当接收到客户端通过 WebSocket 发送的消息时执行此回调函数
$worker->onMessage = function (TcpConnection $connection, $data) {
    // 将客户端发送的消息发布到名为 order-broadcast 的频道中
    Client::publish('order-broadcast', $data);
};

Worker::runAll();

3、测试

启动服务端程序

$ php channel-base.php start

客户端测试

浏览器中打开 F12,控制台中输入如下代码

ws = new WebSocket("ws://127.0.0.1:8888");

ws.onopen = function () {
    ws.send('hello 广播')
}

ws.onmessage = function(e) {
    console.log("收到服务端的消息:" + e.data);
};

可以多打开几个终端进行测试。

channel 集群推送

基于 Worker 的多进程(分布式集群)推送系统,集群群发、集群广播。

1、channel 服务

整个系统只能部署一个 channel 服务。假设运行在 192.168.1.1。

<?php
/**
 * start-channel.php
 *
 * 集群推送服务端
 */

use Workerman\Worker;
use Channel\Server;

require_once __DIR__ . '/vendor/autoload.php';

$channelServer = new Server('0.0.0.0', 2206);

Worker::runAll();
2、websocket 服务

整个系统可以部署多个 ws 服务,假设运行在 192.168.1.2 和 192.168.1.3 两台服务器上。

<?php
/**
 * start-ws.php
 *
 * websocket 服务
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Channel\Client;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('websocket://0.0.0.0:4236');

$worker->count = 2;
$worker->name  = 'pusher';

$worker->onWorkerStart = function (Worker $worker) {
    // Channel 客户端连接到 Channel 服务端
    Client::connect('127.0.0.1', 2206);
    // 以自己的进程 id 为事件名称
    $eventName = $worker->id;

    // 订阅名为当前 Worker 进程 ID 的事件,并注册事件处理函数
    // 当接收到带有指定目标连接 ID 的消息时,向该连接发送消息
    Client::on($eventName, function ($eventData) use ($worker) {
        $toConnectionId = $eventData['to_connection_id'];
        $message = $eventData['content'];

        if (!isset($worker->connections[$toConnectionId])) {
            echo 'connection not exists' . PHP_EOL;

            return;
        }

        $toConnection = $worker->connections[$toConnectionId];

        $toConnection->send($message);
    });

    $eventName = '广播';

    // 当接收到名为 “广播” 的事件时,向当前 Worker 进程内所有已连接的客户端发送广播消息
    Client::on($eventName, function ($eventData) use ($worker) {
        $message = $eventData['content'];

        foreach ($worker->connections as $connection) {
            $connection->send($message);
        }
    });
};

// 当新的 WebSocket 连接建立时执行此回调函数
$worker->onConnect = function (TcpConnection $connection) use ($worker) {
    // 输出连接信息并发送给客户端
    $message = 'workerId:' . $worker->id . 'connectionId' . $connection->id . 'contented' . PHP_EOL;
    echo $message;

    $connection->send($message);
};

Worker::runAll();
3、http 服务
<?php
/**
 * start-http.php
 *
 * http 服务
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Channel\Client;
use Workerman\Protocols\Http\Request;

require_once __DIR__ . '/vendor/autoload.php';

$httpWorker = new Worker('http://0.0.0.0:4327');

$httpWorker->name = 'publisher';

$httpWorker->onWorkerStart = function () {
    // HTTP 服务启动时连接到 Channel 频道
    Client::connect('127.0.0.1', 2206);
};

$httpWorker->onMessage = function (TcpConnection $connection, Request $request) {
    // 兼容Workerman 4.x版本,将请求信息存入 $_GET 全局变量中
    if (! is_array($request)) {
        $_GET = $request->get();
    }

    // 向客户端返回"HTTP OK"响应
    $connection->send('http ok');
    // 如果请求中没有 content 参数,则直接返回
    if (empty($_GET['content'])) {
        return;
    }

    // 判断是否指定了目标 Worker 进程 ID 和连接 ID
    if (isset($_GET['to_worker_id']) && isset($_GET['to_connection_id'])) {
        $eventName      = $_GET['to_worker_id'];
        $toConnectionId = $_GET['to_connection_id'];
        $content        = $_GET['content'];

        // 发布一条包含目标连接 ID 和内容的消息到指定事件(目标 Worker 进程 ID)下
        Client::publish($eventName, [
            'to_connection_id' => $toConnectionId,
            'content'          => $content
        ]);
    } else {
        // 若未指定目标 Worker 进程 ID 和连接 ID,则使用'广播'作为事件名
        $eventName = '广播';
        $content   = $_GET['content'];

        // 发布一条包含内容的消息到'广播'事件下,进行广播通知
        Client::publish($eventName, [
            'content'=>$content,
        ]);
    }
};

Worker::runAll();

4、测试

1、启动服务

$ php start-channel.php start
$ php start-ws.php start
$ php start-http.php start

2、websocket 客户端连接

打开浏览器,F12 打开调试控制台,并在 Console 中输入如下代码:

ws = new WebSocket("ws://127.0.0.1:4236");

ws.onmessage = function(e) {
    console.log("收到服务端的消息:" + e.data);
};

此时,可以看到控制台中输出的信息:收到服务端的消息:workerId:0connectionId2contented

start-ws.php 服务端输出如下:

$ php start-ws.php start
...
workerId:0connectionId2contented

3、通过调用 http 接口推送

3.1、在浏览器中访问如下地址。我在终端中进行测试。

# 该 IP 是我本机 IP
$ curl http://192.168.10.58:4327/?content=PHP-and-Laravel
http ok

# 访问之后,查看 websocket 客户端,会看到如下信息:
收到服务端的消息:PHP-and-Laravel # websocket 客户端

3.2、测试 worker_id & connection_id

浏览器中访问 http://192.168.10.58:4327/?to_worker_id=0&to_connection_id=2&content=王美丽,然后查看 websocket 客户端输入的内容:收到服务端的消息:王美丽

分组推送

<?php
/**
 * group.php
 *
 * 使用 channel 和 websocket 实现分组推送
 *
 * 工作原理:
 *  通过 WebSocket 与客户端保持连接,并使用 Channel 进行消息广播,
 *  当接收到客户端发送的群组操作命令时,根据命令类型进行相应处理(加入群组或向群组发送消息)
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

require_once __DIR__ . '/vendor/autoload.php';

// 创建 channel 服务
$channelServer = new Channel\Server('0.0.0.0', 2206);

// 创建 websocket 服务
$worker = new Worker('websocket://0.0.0.0:1234');

$worker->count = 8;

// 全局群组到连接的映射数组
$groupConMap = [];

$worker->onWorkerStart = function() use (&$groupConMap) {
    // 连接到本地 Channel 服务器
    Channel\Client::connect('127.0.0.1', 2206);

    // 监听全局分组发送消息事件
    Channel\Client::on('send_to_group', function($event_data) use (&$groupConMap) {
        $groupId = $event_data['group_id'];
        $message = $event_data['message'];

        print_r(array_keys($groupConMap));

        // 如果存在对应群组的连接集合,则遍历并发送消息
        if (isset($groupConMap[$groupId])) {
            foreach ($groupConMap[$groupId] as $con) {
                $con->send($message);
            }
        }
    });
};

// 当 WebSocket 客户端发送消息时触发此回调函数
$worker->onMessage = function(TcpConnection $con, $data) use (&$groupConMap) {
    // 加入群组消息{"cmd":"add_group", "group_id":"123"}
    // 或者 群发消息{"cmd":"send_to_group", "group_id":"123", "message":"这个是消息"}
    $data = json_decode($data, true);
    print_r($data);

    $cmd     = $data['cmd'];
    $groupId = $data['group_id'];
    switch($cmd) {
        case "add_group":
            // 将连接加入到对应的群组数组里
            $groupConMap[$groupId][$con->id] = $con;

            // 记录这个连接加入了哪些群组,方便在 onclose 的时候清理 $groupConMap 对应群组的数据
            $con->group_id           = $con->group_id ?? [];
            $con->group_id[$groupId] = $groupId;
            break;
        // 群发消息给群组
        case "send_to_group":
            // 发布一个"send_to_group"事件,携带群组 ID 和要发送的消息
            Channel\Client::publish('send_to_group', array(
                'group_id' => $groupId,
                'message'  => $data['message']
            ));
            break;
    }
};

// 这里很重要,连接关闭时把连接从全局群组数据中删除,避免内存泄漏
$worker->onClose = function(TcpConnection $con) use (&$groupConMap) {
    // 如果连接记录了所属群组信息
    if (isset($con->group_id)) {
        // 遍历并从每个群组中移除该连接
        foreach ($con->group_id as $groupId) {
            unset($groupConMap[$groupId][$con->id]);
            if (empty($groupConMap[$groupId])) {
                unset($groupConMap[$groupId]);
            }
        }
    }
};

Worker::runAll();

服务端启动后,使用留浏览器充当客户端进行测试,打开 F12,在控制台中输入如下代码:

ws = new WebSocket('ws://127.0.0.1:1234');
ws.onmessage = function(data){console.log(data.data)};
ws.onopen = function() {
    ws.send('{"cmd":"add_group", "group_id":"123"}');
    ws.send('{"cmd":"send_to_group", "group_id":"123", "message":"这个是消息"}');
};
请登录后再评论