九、Swoole 基础学习笔记 - Swoole taskWaitMulti 与 taskCo 并发执行任务
hi,我是温新,一名 PHPer
文章基于 Swoole 5.0.1 版本编写。
**学习目标:学习并发执行异步任务 **
说明:本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!
taskWaitMulti
含义:并发执行多个 task 异步任务。
$server->taskWaitMulti(array $tasks, float $time = 0.5): bool|array;
参数:
- $tasks:必须为数字索引数组,不支持关联索引数组,底层会遍历
$tasks
将任务逐个投递到 Task 进程; - $time:为浮点型,单位为秒
返回值:
- 任务完成或超时,返回结果数组。结果数组中每个任务结果的顺序与
$tasks
对应,如:$tasks[2]
对应的结果为$result[2]
; - 某个任务执超时不会影响其他任务,返回的结果数据中将不包含超时的任务。
最大并发任务不得超过
1024
。
<?php
// 9-swoole-server-taskwaitmulti.php
$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS);
$server->set([
'worker_num' => 3,
'task_worker_num' => 3,
]);
$server->on('Receive', function ($server, $fd, $reactorId, $data) {
$tasks[] = mt_rand(1000, 9999); //任务1
$tasks[] = mt_rand(1000, 9999); //任务2
$tasks[] = mt_rand(1000, 9999); //任务3
//等待所有Task结果返回,超时为10s
$results = $server->taskWaitMulti($tasks, 10.0);
if (isset($results[0])) {
echo "任务2的执行结果为{$results[0]}\n";
}
if (isset($results[1])) {
echo "任务2的执行结果为{$results[1]}\n";
}
if (isset($results[2])) {
echo "任务3的执行结果为{$results[2]}\n";
}
});
$server->on('Task', function ($server, $taskId, $srcWorkerId, $data) {
sleep(2);
return 'success ' . $data;
});
$server->on('Finish', function ($server, $taskId, $data) {
});
$server->start();
执行结果:
$php 9-swoole-server-taskwaitmulti.php
任务2的执行结果为success 8296
任务2的执行结果为success 5140
任务3的执行结果为success 5485
当任务数量大于异步 worker_num 数量时,需要排队等待,如现在有 3 个 task_worker,有 4 个任务,则第 4 个任务需要等待前三个完成后才会执行。
taskCo
含义:并发执行 Task 并进程协程调度。
$server->taskCo(array $tasks, float $timeout = 0.5): array
参数:
-
$tasks
任务列表,必须为数组。底层会遍历数组,将每个元素作为task
投递到Task
进程池; -
$timeout
超时时间,默认为0.5
秒,当规定的时间内任务没有全部完成,立即中止并返回结果;
返回值:
- 任务完成或超时,返回结果数组。结果数组中每个任务结果的顺序与
$tasks
对应,如:$tasks[2]
对应的结果为$result[2]
; - 某个任务执行失败或超时,对应的结果数组项为
false
,如:$tasks[2]
失败了,那么$result[2]
的值为false
。
调度过程:
-
$tasks
列表中的每个任务会随机投递到一个Task
工作进程,投递完毕后,yield
让出当前协程,并设置一个$timeout
秒的定时器; - 在
onFinish
中收集对应的任务结果,保存到结果数组中。判断是否所有任务都返回了结果,如果为否,继续等待。如果为是,进行resume
恢复对应协程的运行,并清除超时定时器国; - 在规定的时间内任务没有全部完成,定时器先触发,底层清除等待状态。将未完成的任务结果标记为
false
,立即resume
对应协程。
<?php
//9-swoole-server-taskco.php
$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS);
$server->set([
'worker_num' => 3,
'task_worker_num' => 2
]);
$server->on('Receive', function ($server, $fd, $reactorId, $data) {
$tasks = [1,2,3];
//等待所有Task结果返回,超时为10s
$results = $server->taskCo($tasks, 5);
print_r($results);
});
$server->on('Task', function ($server, $taskId, $srcWorkerId, $data) {
sleep(3);
echo 'task ' . $data . PHP_EOL;
return 'success ' . $data;
});
$server->on('Finish', function ($server, $taskId, $data) {
});
$server->start();
输出结果:
$php 9-swoole-server-taskco.php
task 1
task 2
Array
(
[0] => success 1
[1] => success 2
[2] =>
)
# 任务超时后输出了结果
task 3
# 任务已超时
[2023-01-03 18:54:44 *24196.2] WARNING php_swoole_server_onFinish() (ERRNO 2003): task[2] has expired
代码解析:
- task_worker 设置为 2 个进程;
- 有 3 个 task 任务,任务超时时间为 5 秒;
- 每个异步任务会有 3 秒的处理时间;
- 2 个task_work 处理 3 个异步任务,每个任务要 3 秒,那么有一个任务一定会超时;
- 当有一个任务超时时,立即返回处理结果,但是超时的任务还在处理,也就输出了
task 3
。
本篇文章到此结束,我是温新,下篇文章继续学习。
请登录后再评论