八、Swoole 基础学习笔记 - 学习 Swoole Server 异步任务的使用

作者: 温新

分类: 【Swoole 系列】

阅读: 853

时间: 2023-02-19 15:32:34

hi,我是温新,一名PHPer

文章基于 Swoole 5.0 版本编写。

学习目标:学习 Swoole Server 异步任务的使用

说明:本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

异步任务涉及到 task、finish、taskwait、taskWaitMulit、taskCo,通过本篇文章的学习,掌握这些方法的使用。

简说异步任务

了解异步任务之前,先来了解一下什么是同步任务?同步任务,有着严格的先后顺序,比如去医院看病,只有先挂号,才能去找医生看病。每一个步骤都有着严格的执行顺序。

现在再来看看异步任务,好比你正在和一个美眉正在聊天,聊到嗨时感觉口渴,你拿起手机给小弟打了个电话,买瓶水到这个地址来。而小弟去买水这个动作就是异步任务,水没有送到之前你和美眉该干嘛就干嘛。

简述 Task 进程

Task 进程独立于 Worker 进程,主要用于处理耗时较长的业务逻辑,又不会影响 worker 进程处理其他客户端的请求。worker 进程通过 task() 函数把数据投递到 Task 进程处理。

Task 工作流程

一起来看一下 task 的工作流程是怎样的:

  • 1、异步任务在 worker 进程中使用的 task() 方法发送数据通知 task worker 进程;
  • 2、task worker 进程在 onTask 回调中接收数据并进程处理;
  • 3、task worker 处理完任务后通过 finish() 函数或 return 返回消息给 worker 进程;
  • 4、worker 进程在 onFinish 回调中接收到这些数据并进程处理。

Swoole Task 异步任务

本次任务的将使用 server & client 进程演示。

Task 任务服务端

<?php
// 5-swoole-task-server-1.php
$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS);

$server->set([
    'worker_num' => 2,
    'task_worker_num' => 2,
]);


$server->on('Receive', function ($server, $fd, $reactorId, $data) {
    $server->task($data);
    $server->send($fd, '收到客户端信息,异步任务已投递');
    echo '我是 worker 进程,继续往下走' . PHP_EOL;
});

/**
 * $server Swoole/Server 实例对象
 * $taskId 投递的任务 ID
 * $fromId 来自哪个 worker 进程的 ID
 * $data   要投递的任务数据
 **/
$server->on('Task', function ($server, $taskId, $fromId, $data) {
    echo '来自 ' . $fromId . ' 的异步任务 ' . $taskId . ' 开始执行' . PHP_EOL;
    for ($i = 0; $i < 10; $i++) {
        sleep(1);
        echo '运行异步任务 ' . $i . PHP_EOL;
    }
    return 'task end' . PHP_EOL;
});

/**
 * 该回调只在 task 进程中调用了 finish() 方法或 return 时才会触发
 * 
 * $server Swoole/Server 实例对象
 * $taskId 投递的任务 ID
 * $data   要投递的任务数据
 **/
$server->on('Finish', function ($server, $taskId, $data) {
    echo 'Finish 异步任务执行完成,taskId=' . $taskId . '数据是:' . $data . PHP_EOL;
});

$server->start();

client 客户端

<?php
// 5-swoole-task-client-1.php
$client = new Swoole\Client(SWOOLE_SOCK_TCP);

if (! $client->connect('127.0.0.1', 9501, -1)) {
	exit('connect failed. Error:' . $client->errCode . PHP_EOL);
}

$client->send('hi, server,, 请开始执行异步任务');

echo $client->recv();

$client->close();

客户端的代码不再说明,服务端的代码,只对异步任务这一块代码进行说明,说明如下:

1)开始 task 功能

task 异步任务默认是关闭的,开启 task 要满足 2 个条件,一个是在 set() 中配置 task 进程数;二是注册 task 的回调函数 onTaskonFinish

配置 task 进程数通过 task_worker_num 进行设置,如下:

$server->set([
	'worker_num' => 2,
    // 开启 2 个异步任务进程
	'task_worker_num' => 2,
]);

2)使用 task 异步任务

异步任务在 worker 进程中发起,worker 进程使用 task 函数将任务投递到 task 进程进行处理。

task 函数是非阻塞函数,将任务投递到 task 进程后会立即返回,不影响 worker 进程进行其他操作。但 task 进程是阻塞的,若 task 进程都处于繁忙状态(都在处理任务),此时若有更多的异步任务过来,此时这些新的异步任务只有排队等待空闲的 task 进程才能处理任务,这些新的任务若塞满 task 缓冲区,则会导致 worker 进程阻塞。

若异步任务量一直大于 task 进程的处理能力,需要适当调整 task_worker_number 进程数量。

/**
 * $server Swoole/Server 实例对象
 * $taskId 投递的任务 ID
 * $fromId 来自哪个 worker 进程的 ID
 * $data   要投递的任务数据
 **/
$server->on('Task', function ($server, $taskId, $fromId, $data) {
    echo '来自 ' . $fromId . ' 的异步任务 ' . $taskId . ' 开始执行' . PHP_EOL;
    for ($i = 0; $i < 10; $i++) {
        sleep(1);
        echo '运行异步任务 ' . $i . PHP_EOL;
    }
    return 'task end' . PHP_EOL;
});

/**
 * 该回调只在 task 进程中调用了 finish() 方法或 return 时才会触发
 * 
 * $server Swoole/Server 实例对象
 * $taskId 投递的任务 ID
 * $data   要投递的任务数据
 **/
$server->on('Finish', function ($server, $taskId, $data) {
    echo 'Finish 异步任务执行完成,taskId=' . $taskId . '数据是:' . $data . PHP_EOL;
});

看看输出结果:

$php 5-swoole-task-server-1.php 
我是 worker 进程,继续往下走	# worker 进程输出的信息
来自 1 的异步任务 0 开始执行  # task 进程输出的信息,理解下为什么它输出
运行异步任务 0
运行异步任务 1
运行异步任务 2
运行异步任务 3
运行异步任务 4
运行异步任务 5
运行异步任务 6
运行异步任务 7
运行异步任务 8
运行异步任务 9
Finish 异步任务执行完成,taskId=0数据是:task end

Task 任务切分

上述的案例中是直接投递任务,由 task 自动进行分配。除此之外,还可以指定 task 进程处理。

假设有大数据处理任务,可以将大数据拆分成相对应 task 进程数量的分数,如有 5 个 task 进程,可以将数据拆分成 5 份;

然后可以通过循环将数据投递到指定任务进程进行处理,task 进程数计算方式:(0 - (task_worker_num - 1))

<?php

$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS);

$server->set([
	'worker_num'      => 2,
	'reactor_num'     => 6,
	'task_worker_num' => 5,
	'max_request'     => 1000,
]);

$server->on('Receive', function ($server, $fd, $fromId, $data) {
	// 模拟数据
	$data = [];
	for ($i = 0; $i < 16; $i++) {
		$data[$i] = ['id' => $i, 'name' => '自如初'];
	}

	$server->send($fd, '正在处理数据');

	// worker 进程数
	$taskWorkerNum = 5;
	$count         = count($data);
	// 分块处理数据
	$data          = array_chunk($data, ceil($count / $taskWorkerNum));
	// 分配给不同的进程处理
	foreach ($data as $k => $v) {
		$v['src_worker_id'] = $k;
		$server->task($v, $k);
	}
	echo '继续执行同步任务' . PHP_EOL;
});

$server->on('Task', function ($server, $taskId, $srcWorkerId, $data) {
	echo '消息任务 ' . $taskId . ' 来自于 worker-' . $srcWorkerId . PHP_EOL;
	sleep(3);
	return [
		'succ' => $data['src_worker_id'],
	];
});

$server->on('Finish', function ($server, $taskId, $data) {
	echo 'task:' . $taskId . ' -任务处理完成' . PHP_EOL;
});

$server->start();

对于是否有空闲进程,swoole 没有提供相对应的方法,我们可以根据当前任务是否执行完毕来确定当前进程是否处于空闲状态:

  • 1、投递 task 异步任务时,保存当前的 src_task_id 到数组中,任务投递时的 task_worker 进程 ID 号;
  • 2、任务执行完成后,携带已完成的 src_task_id 到 worker 进程中;
  • 3、worker 进程中维持当前 task 进程的状态。

Task 注意事项

  • 1、运行 task 进程,必须设置 task_worker_num 参数,且必须绑定 onTaskonFinish 回调;

  • 2、task 传递数据大小问题:当数据小于 8k 时,直接通过管道传递,大于 8k 时,写入临时文件传递,onTask 会读取该文件并读取内容;

  • 3、onTask 回调中,若不使用 return 返回,则 onFinish 不会执行;

  • 4、Task 传递对象

    • 4.1、默认 task 只能传递数据,可以将对象序列化传递;
    • 4.2、task 中对象的改变不会反应到 worker 进程中数据库连接;
    • 4.3、网络连接对象不能传递,php 会报错;
  • onFinish 回调:会调用 task 方法的 worker 进程。

本篇文章到此结束,我是温新,下篇文章继续学习。

请登录后再评论