5、Workerman 基本使用 - Worker 回调属性
hi,我是温新,一名 PHPer
Workerman 中的 Worker
类是整个框架的核心类,它主要负责创建并管理应用服务进程。在 Workerman 框架中,一个 Worker 实例通常代表一个监听特定端口并处理客户端连接的服务进程。
Worker 类的主要作用和功能包括:
-
监听端口:
- 开发者可以通过
new Worker('协议://ip:port')
的方式创建一个 Worker 实例来监听指定的网络协议(如 TCP、UDP 或 WebSocket)以及 IP 和端口号。
- 开发者可以通过
-
多进程管理:
- Worker 支持设置多个子进程 (
$worker->count
) 来同时处理客户端连接,从而实现高并发。 - 可以通过 Worker 类进行进程相关的操作,比如进程间的通信、进程重启等。
- Worker 支持设置多个子进程 (
-
事件驱动编程:
- 提供了丰富的回调函数接口,如
onConnect
(客户端连接时触发)、onMessage
(接收到数据时触发)、onClose
(连接关闭时触发)、onError
(发生错误时触发)等,允许开发者自定义各种网络事件的处理逻辑。
- 提供了丰富的回调函数接口,如
-
守护进程化:
- 提供了将工作进程变为守护进程的方法,使得服务可以在后台长期运行,并且能够在意外退出时自动重启。
-
定时任务:
- 通过 Worker 类可以方便地设置定时任务,例如使用内置的 Timer 组件,能在指定时间间隔内执行某个回调函数。
-
异步I/O支持:
- Worker 内部基于 event-loop 机制实现了异步 I/O,能够有效地利用系统资源,提高应用程序的性能和吞吐量。
-
扩展性:
- Worker 类为开发人员提供了良好的扩展能力,可以根据需要扩展自定义协议或其他组件。
-
状态管理:
- 可以获取当前 Worker 进程的状态信息,包括进程 ID、运行状态等,并进行相应管理。
-
子进程间共享数据:
- 使用全局变量或 Workerman 自带的数据共享机制,在子进程中存储和传递数据。
不先来了解一下构造函数,不然我们就没得玩了,因此,还是先来了解一下构造函数吧。
Worker::__construct
用于初始化一个 Worker 容器实例,其第一个参数是各种协议:tcp、udp、unix、http、websocket、text 及自定义协议。
接下来在案例中演示这些回调属性,这些案例在同一个文件中进行演示,写完一个案例然后清空代码写一下案例。
onWorkerStart
作用:子进程启动时,触发该回调。
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('tcp://0.0.0.0:8888');
// 设置子进程数量
$worker->count = 3;
// Worker 进程启动时触发此回调
$worker->onWorkerStart = function (Worker $worker) {
echo 'worker starting' . PHP_EOL;
};
Worker::runAll();
测试运行
$ php worker-construct.php start
Workerman[worker-construct.php] start in DEBUG mode
------------------------------------------- WORKERMAN -------------------------------------------
Workerman version:4.1.15 PHP version:8.2.0 Event-Loop:\Workerman\Events\Event
-------------------------------------------- WORKERS --------------------------------------------
proto user worker listen processes status
tcp codeing none tcp://0.0.0.0:8888 3 [OK]
-------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
worker starting
worker starting
worker starting
worker starting
输出了 3 次。开启了 3 个进程,因此会输出 3 次。由此可以得出:若开启了 count 个进程,每个子进程运行一次,则总共会运行 count 次。
onConnect
作用:客户端与服务端完成 TCP 三次握手后触发。
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('tcp://0.0.0.0:8888');
// 设置子进程数量
$worker->count = 3;
// 当客户端与服务端成功建立连接时触发的回调函数
$worker->onConnect = function (TcpConnection $connection) {
echo '客户端与服务端握手成功:' . $connection->getRemoteIp() . PHP_EOL;
};
Worker::runAll();
测试运行
$ php worker-construct.php start
Workerman[worker-construct.php] start in DEBUG mode
------------------------------------------- WORKERMAN -------------------------------------------
Workerman version:4.1.15 PHP version:8.2.0 Event-Loop:\Workerman\Events\Event
-------------------------------------------- WORKERS --------------------------------------------
proto user worker listen processes status
tcp codeing none tcp://0.0.0.0:8888 3 [OK]
-------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
客户端与服务端握手成功:127.0.0.1
客户端与服务端握手成功:127.0.0.1
客户端连接
# 客户端 1
$ telnet 127.0.0.1 8888
# 客户端 2
$ telnet 127.0.0.1 8888
- 当客户端与服务端完成握手后触发该回调,每一个连接的客户端都会触发一次这个回调;
- 握手完成之后,此时,服务端无法识别这个连接上来的客户端是谁;
- udp 不会触发此回调和 onClose 回调。
onWorkerReload
作用:重新加载服务时触发此回调。
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('tcp://0.0.0.0:8888');
// 设置子进程数量
$worker->count = 3;
$worker->name = 'workerman';
// Worker 进程启动时触发此回调
$worker->onWorkerStart = function (Worker $worker) {
echo 'worker starting' . PHP_EOL;
};
// Worker 重载时触发此回调
$worker->onWorkerReload = function(Worker $worker)
{
foreach($worker->connections as $connection) {
$connection->send('worker reloading');
}
};
Worker::runAll();
我们开启 4 个终端进行此次测试,1 个用于启动服务,2 个用于连接服务,1 个用于重载服务。
1)终端 1 用于启动服务
$ php worker-construct.php start
Workerman[worker-construct.php] start in DEBUG mode
------------------------------------------- WORKERMAN -------------------------------------------
Workerman version:4.1.15 PHP version:8.2.0 Event-Loop:\Workerman\Events\Event
-------------------------------------------- WORKERS --------------------------------------------
proto user worker listen processes status
tcp codeing workerman tcp://0.0.0.0:8888 3 [OK]
-------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
worker starting
worker starting
worker starting
2)终端 2 和 3 用于连接服务
# 终端 2
$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Connection closed by foreign host.
# 终端 3
$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Connection closed by foreign host.
# 连接后都没有断开服务
4)终端 4 重载服务
# 1、查看服务进程号
$ ps aux | grep '[w]orkerman' | awk '{print $2}'
135435 # 主进程
# 剩下 3 个为子进程
135477
135478
135479
# 重载服务
# 同时注意终端 2 和 3 的变化及 终端 1 的服务
$ php worker-construct.php reload
# 再次查看进程号
$ ps aux | grep '[w]orkerman' | awk '{print $2}'
135435 # 主进程没有发生变化,子进程 ID 全变
135802
135803
135804
重载服务端后,终端 2 和 3 的连接被关闭,此时终端 1 的输出如下:
$ php worker-construct.php start
...
Press Ctrl+C to stop. Start success.
worker starting
worker starting
worker starting
# 进程重载之后的输出信息
Workerman[worker-construct.php] reloading
进程重载
进程重载
worker starting
worker starting
worker starting
从输出有结果,我们可以发现如下信息:
- 重载服务后,子进程是退出重启,因此进程号发生了变化;主进程没有变化;
- 重载服务后,先执行了
onWorkerReload
回调,因此先输出进程重载
而后输出worker starting
, - 重载服务后,会重新调用
onWorkerStart
回调。
如何使用子进程不重启?
设置 reloadable
属性为 false
时,子进程不执行重启。
<?php
...
$worker->name = 'workerman';
// 设置子进程不重启
$worker->reloadable = false;
...
服务重新启动后,使用一个终端来测试
$ ps aux | grep '[w]orkerman' | awk '{print $2}'
139304
139305
139306
139307
# 重载服务
$ php worker-construct.php reload
Workerman[worker-construct.php] reload
# 再次查看进程号
$ ps aux | grep '[w]orkerman' | awk '{print $2}'
139304
139305
139306
139307
onMessage
作用:收到客户端发送过来的数据时触发。
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('tcp://0.0.0.0:8888');
// 设置子进程数量
$worker->count = 3;
// 设置进程名称
$worker->name = 'workerman';
$worker->onMessage = function (TcpConnection $connection, $data) {
echo '收到客户端发送的数据:' . $data;
};
Worker::runAll();
onClose
作用:客户端断开连接时触发。
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('tcp://0.0.0.0:8888');
// 设置子进程数量
$worker->count = 3;
// 设置进程名称
$worker->name = 'workerman';
$worker->onClose = function (TcpConnection $connection) {
echo '客户端:' . $connection->getRemoteIp() . ' 断开连接';
};
Worker::runAll();
注意:若客户端是断网或断电等情况而断开连接时,由于无法及时发送 TCP 的 FIN,因此,服务端无法知道客户端已经断开,也就无法触发 onClose
回调了。对于这种情况,可以采用应用层心跳
解决。
onError
作用:当客户端连接发生错误时触发。
onError
目前有三种错误类型:
- 1、调用Connection::send由于客户端连接断开导致的失败(紧接着会触发onClose回调)
(code:WORKERMAN_SEND_FAIL msg:client closed)
- 2、发送缓冲区已满,但还在发送数据
- 3、使用异步连接失败
发送缓冲区满导致的发送失败
这个案例会持续向客户端发送消息直到发送缓冲区满,然后尝试继续发送消息以触发错误。
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker("tcp://0.0.0.0:8888");
$worker->onConnect = function($connection) {
echo "New connection\n";
// 发送大量数据以填满发送缓冲区
for ($i = 0; $i < 1000; $i++) {
// 发送1MB数据
$connection->send('x' . str_repeat('0', 1024 * 1024));
}
};
$worker->onError = function($connection, $code, $msg) {
echo "错误代码为:" . $code . '; 错误消息:' . $msg . PHP_EOL;
};
Worker::runAll();
测试运行
# 启动服务端
start in DEBUG mode
...
New connection
错误代码为:2; 错误消息:send buffer full and drop package
错误代码为:2; 错误消息:send buffer full and drop package
# 客户端连接
$ telnet 127.0.0.1 8888
AsyncTcpConnection 异步连接失败
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker("tcp://0.0.0.0:8080");
// 当有客户端连接时
$worker->onConnect = function ($connection) {
// 使用 AsyncTcpConnection 进行异步连接
$asyncConnection = new AsyncTcpConnection("tcp://127.0.0.1:8888");
$asyncConnection->onError = function ($conn, $code, $msg) {
echo '异步连接出错:' . $code . ';错误消息:' . $msg . PHP_EOL;
};
$asyncConnection->connect();
};
// 开始运行工作进程
Worker::runAll();
测试
# 服务端
$ php worker-construct.php start
...
Press Ctrl+C to stop. Start success.
异步连接出错:1;错误消息:connect 127.0.0.1:8081 fail after 0.0001 seconds
# 客户端连接
$ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
客户端断开连接
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker("tcp://0.0.0.0:8888");
$worker->onError = function(TcpConnection $connection, $code, $msg) {
echo "error $code $msg\n";
};
Worker::runAll();
onBufferFull
作用:缓冲区被填满时触发
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker("tcp://0.0.0.0:8888");
$worker->onConnect = function(TcpConnection $connection) {
// 设置当前连接发送缓冲区,单位字节
$connection->maxSendBufferSize = 512;
// 发送大量数据以填满发送缓冲区
for ($i = 0; $i < 10; $i++) {
// 发送 1MB 数据
$connection->send('x' . str_repeat('0', 1024 * 1024));
}
};
$worker->onBufferFull = function (TcpConnection $connection) {
echo '缓冲区已满' . PHP_EOL;
};
Worker::runAll();
注意事项看文档,有详细介绍。
onBufferDrain
作用:该回调在应用层发送缓冲区数据全部发送完毕后触发
<?php
/**
* worker-construct.php
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker("tcp://0.0.0.0:8888");
$worker->onConnect = function(TcpConnection $connection) {
$connection->maxSendBufferSize = 1;
for ($i = 0; $i < 10; $i++) {
// 发送 1MB 数据
$connection->send('x' . str_repeat('0', 1024 * 1024));
}
};
$worker->onBufferFull = function($connection) {
echo "Buffer full";
};
$worker->onBufferDrain = function(TcpConnection $connection){
echo "buffer drain and continue send\n";
};
Worker::runAll();