九、Swoole 基础学习笔记 - Swoole taskWaitMulti 与 taskCo 并发执行任务

作者: 温新

分类: 【Swoole 系列】

阅读: 781

时间: 2023-02-19 15:53:41

hi,我是温新,一名 PHPer

文章基于 Swoole 5.0.1 版本编写。

**学习目标:学习并发执行异步任务 **

说明:本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

taskWaitMulti

含义:并发执行多个 task 异步任务。

$server->taskWaitMulti(array $tasks, float $time = 0.5): bool|array;

参数:

  • $tasks:必须为数字索引数组,不支持关联索引数组,底层会遍历 $tasks 将任务逐个投递到 Task 进程;
  • $time:为浮点型,单位为秒

返回值:

  • 任务完成或超时,返回结果数组。结果数组中每个任务结果的顺序与 $tasks 对应,如:$tasks[2] 对应的结果为 $result[2]
  • 某个任务执超时不会影响其他任务,返回的结果数据中将不包含超时的任务。

最大并发任务不得超过 1024

<?php
// 9-swoole-server-taskwaitmulti.php
    
$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS);

$server->set([
    'worker_num'      => 3,
    'task_worker_num' => 3,
]);

$server->on('Receive', function ($server, $fd, $reactorId, $data) {
    $tasks[] = mt_rand(1000, 9999); //任务1
    $tasks[] = mt_rand(1000, 9999); //任务2
    $tasks[] = mt_rand(1000, 9999); //任务3

    //等待所有Task结果返回,超时为10s
    $results = $server->taskWaitMulti($tasks, 10.0);
    if (isset($results[0])) {
        echo "任务2的执行结果为{$results[0]}\n";
    }
    if (isset($results[1])) {
        echo "任务2的执行结果为{$results[1]}\n";
    }
    if (isset($results[2])) {
        echo "任务3的执行结果为{$results[2]}\n";
    }
});

$server->on('Task', function ($server, $taskId, $srcWorkerId, $data) {
    sleep(2);
    return 'success ' . $data;
});

$server->on('Finish', function ($server, $taskId, $data) {
});

$server->start();

执行结果:

$php 9-swoole-server-taskwaitmulti.php 
任务2的执行结果为success 8296
任务2的执行结果为success 5140
任务3的执行结果为success 5485

当任务数量大于异步 worker_num 数量时,需要排队等待,如现在有 3 个 task_worker,有 4 个任务,则第 4 个任务需要等待前三个完成后才会执行。

taskCo

含义:并发执行 Task 并进程协程调度。

$server->taskCo(array $tasks, float $timeout = 0.5): array

参数:

  • $tasks 任务列表,必须为数组。底层会遍历数组,将每个元素作为 task 投递到 Task 进程池;
  • $timeout 超时时间,默认为 0.5 秒,当规定的时间内任务没有全部完成,立即中止并返回结果;

返回值:

  • 任务完成或超时,返回结果数组。结果数组中每个任务结果的顺序与 $tasks 对应,如:$tasks[2] 对应的结果为 $result[2]
  • 某个任务执行失败或超时,对应的结果数组项为 false,如:$tasks[2] 失败了,那么 $result[2] 的值为 false

调度过程:

  • $tasks 列表中的每个任务会随机投递到一个 Task 工作进程,投递完毕后,yield 让出当前协程,并设置一个 $timeout 秒的定时器;
  • onFinish 中收集对应的任务结果,保存到结果数组中。判断是否所有任务都返回了结果,如果为否,继续等待。如果为是,进行 resume 恢复对应协程的运行,并清除超时定时器国;
  • 在规定的时间内任务没有全部完成,定时器先触发,底层清除等待状态。将未完成的任务结果标记为 false,立即 resume 对应协程。
<?php
//9-swoole-server-taskco.php

$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS);

$server->set([
    'worker_num'      => 3,
    'task_worker_num' => 2
]);

$server->on('Receive', function ($server, $fd, $reactorId, $data) {

    $tasks = [1,2,3];

    //等待所有Task结果返回,超时为10s
    $results = $server->taskCo($tasks, 5);

    print_r($results);
});

$server->on('Task', function ($server, $taskId, $srcWorkerId, $data) {
    sleep(3);
    echo 'task ' . $data . PHP_EOL;
    return 'success ' . $data;
});

$server->on('Finish', function ($server, $taskId, $data) {
});

$server->start();

输出结果:

$php 9-swoole-server-taskco.php 
task 1
task 2
Array
(
    [0] => success 1
    [1] => success 2
    [2] => 
)
# 任务超时后输出了结果
task 3
# 任务已超时
[2023-01-03 18:54:44 *24196.2]  WARNING php_swoole_server_onFinish() (ERRNO 2003): task[2] has expired

代码解析:

  • task_worker 设置为 2 个进程;
  • 有 3 个 task 任务,任务超时时间为 5 秒;
  • 每个异步任务会有 3 秒的处理时间;
  • 2 个task_work 处理 3 个异步任务,每个任务要 3 秒,那么有一个任务一定会超时;
  • 当有一个任务超时时,立即返回处理结果,但是超时的任务还在处理,也就输出了 task 3

本篇文章到此结束,我是温新,下篇文章继续学习。

请登录后再评论