32、Hyperf 3 快速使用 - Hyperf 3 协程理解和使用(重点)
hi,我是温新,一名 PHPer
Hypref 版本:Hyperf 3.0
学习目标:着重理解协程并使用
官方文档讲的太清楚了,请文档吧:https://hyperf.wiki/3.0/#/zh-cn/coroutine
这里只进行简单记录。
协程是什么
协程是一种轻量级的线程,由用户代码来调度和管理,而不是由操作系统内核来进行调度,也就是在用户态进行。
协程与普通线程的区别
相同:协程是一个轻量级的线程,协程和线程都适用于多任务的场景下,从这个角度上来说,协程与线程很相似,都有自己的上下文,可以共享全局变量。
不同:线程同一时间可以运行多个;线程是抢占式资源。Swoole 协程同一时间只能有一个,其它的协程都会处于暂停的状态;协程是协作式的,执行权由用户态自行分配。
协程编程注意事项
- 不能存在阻塞代码;
- 不能通过全局变量存储状态;
- 最大协程数限制。
使用协程
使用协程之前,先来创建一个控制器。
<?php
namespace App\Controller\Test;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\Utils\Coroutine;
#[Controller]
class CoroutineController
{
}
多程方式创建协程
创建协程,可以通过 ``co(callable $callable)或go(callable $callable)函数或Hyperf\Utils\Coroutine::create(callable $callable)`,协程内可以使用协程相关的方法和客户端。
方式一:使用 co
<?php
// App\Controller\Test\CoroutineController.php
use Hyperf\Utils\Coroutine;
    
#[GetMapping('/co/add')]
public function add()
{
    echo '当前协程' . Coroutine::id() . PHP_EOL;
    co(function () {
        echo '手动创建协程' . Coroutine::id() . PHP_EOL;
    });
}
输出结果
当前协程2
手动创建协程3
方式二:使用 go
<?php
// App\Controller\Test\CoroutineController.php
use Hyperf\Utils\Coroutine;
#[GetMapping('/co/add')]
public function add()
{
    echo '当前协程 ' . Coroutine::id() . PHP_EOL;
    go(function () {
        echo '使用 go 函数创建协程,协程 ID ' . Coroutine::id() . PHP_EOL;
    });
}
输出结果
当前协程2
使用 go 函数创建协程,协程 ID 3
方式三:使用 create
<?php
// App\Controller\Test\CoroutineController.php
use Hyperf\Utils\Coroutine;
#[GetMapping('/co/add')]
public function add()
{
    echo '当前协程 ' . Coroutine::id() . PHP_EOL;
    Coroutine::create(function () {
        echo '使用 create 方法创建协程,协程 ID ' . Coroutine::id() . PHP_EOL;
    });
}
输出结果
当前协程 2
使用 create 方法创建协程,协程 ID 3
判断当前环境是否处于协程环境内
Hyperf\Utils\Coroutine::inCoroutine(): bool
#[GetMapping('/co/add')]
public function add()
{
	var_dump(Coroutine::inCoroutine());
}
获取当前协程 ID
Hyperf\Utils\Coroutine::id(): int 
Channel 通道
Channel 主要用于协程间通讯,当我们希望从一个协程里返回一些数据到另一个协程时,就可通过 Channel 来进行传递。
主要方法:
- 
Channel->push:当队列中有其他协程正在等待pop数据时,自动按顺序唤醒一个消费者协程。当队列已满时自动yield让出控制权,等待其他协程消费数据
- 
Channel->pop:当队列为空时自动yield,等待其他协程生产数据。消费数据后,队列可写入新的数据,自动按顺序唤醒一个生产者协程
#[GetMapping('/co/add')]
public function add()
{
    co(function () {
        $channel = new \Swoole\Coroutine\Channel();
        co(function () use ($channel) {
            $message = Coroutine::id() . ' ,来自 parent 的数据';
            $channel->push($message);
        });
        $data = $channel->pop();
        echo Coroutine::id() . ', 收到 parent 的数据:' . $data . PHP_EOL;
    });
}
输出结果
3, 收到 parent 的数据:4 ,来自 parent 的数据
Defer 特性
当我们希望在协程结束时运行一些代码时,可以通过 defer(callable $callable) 函数或 Hyperf\Coroutine::defer(callable $callable) 将一段函数以 栈(stack) 的形式储存起来,栈(stack) 内的函数会在当前协程结束时以 先进后出 的流程逐个执行。
#[GetMapping('/co/add')]
public function add()
{
    echo 'start' . PHP_EOL;
    go(function () {
        echo 'init' . PHP_EOL;
        Coroutine::defer(function () {
            echo "第一个defer".PHP_EOL;
        });
        Coroutine::defer(function(){
            echo "第二个defer".PHP_EOL;
        });
        Coroutine::defer(function(){
            echo "第三个defer".PHP_EOL;
        });
        echo 'defer end'.PHP_EOL;
    });
    echo 'end' . PHP_EOL;
}
输出结果
start
init
defer end
第三个defer
第二个defer
第一个defer
end
WaitGroup 特性
WaitGroup 是基于 Channel 衍生出来的一个特性。在 Hyperf 中,WaitGroup 的用途是使得主协程一直阻塞等待直到所有相关的子协程都已经完成了任务后再继续运行,这里说到的阻塞等待是仅对于主协程(即当前协程)来说的,并不会阻塞当前进程。
#[GetMapping('/co/add')]
public function add()
{
    $wg = new \Hyperf\Utils\WaitGroup();
    // 计数器加二
    $wg->add(2);
    // 创建协程 A
    go(function () use ($wg) {
        mt_srand();
        $time = mt_rand(1, 3);
        sleep($time);
        echo "协程 A 执行完成" . PHP_EOL;
        // 计数器减一
        $wg->done();
    });
    // 创建协程 B
    go(function () use ($wg) {
        mt_srand();
        $time = mt_rand(1, 3);
        sleep($time);
        echo "协程 B 执行完成" . PHP_EOL;
        // 计数器减一
        $wg->done();
    });
    $wg->wait();
    echo "全部程序执行完成" . PHP_EOL;
}
输出结果
协程 A 执行完成
协程 B 执行完成
全部程序执行完成
Paraller 特性
Parallel 特性是 Hyperf 基于 WaitGroup 特性抽象出来的一个更便捷的使用方法。
普通版实现
use Hyperf\Utils\Exception\ParallelExecutionException;
use Hyperf\Utils\Parallel;
#[GetMapping('/co/add')]
public function add()
{
    $parallel = new Parallel();
    $parallel->add(function () {
        mt_srand();
        $time = mt_rand(1, 5);
        sleep($time);
        echo "协程 A 执行完成" . PHP_EOL;
        return Coroutine::id();
    });
    $parallel->add(function () {
        mt_srand();
        $time = mt_rand(1, 3);
        sleep($time);
        echo "协程 B 执行完成" . PHP_EOL;
        return Coroutine::id();
    });
    try {
        $results = $parallel->wait();
        print_r($results);
    } catch (ParallelExecutionException $e) {
        // $e->getResults() 获取协程中的返回值。
        // $e->getThrowables() 获取协程中出现的异常。
    }
}
输出结果
ner listener.
协程 A 执行完成
协程 B 执行完成
Array
(
    [0] => 3
    [1] => 4
)
简化版实现
#[GetMapping('/co/add')]
public function add()
{
    $result = parallel([
        function () {
            mt_srand();
            $time = mt_rand(1, 5);
            sleep($time);
            echo "协程 A 执行完成" . PHP_EOL;
            return Coroutine::id();
        },
        function () {
            mt_srand();
            $time = mt_rand(1, 3);
            sleep($time);
            echo "协程 B 执行完成" . PHP_EOL;
            return Coroutine::id();
        }
    ]);
    print_r($result);
}
限制 parallel 最大同时运行和协程数
#[GetMapping('/co/add')]
public function add()
{
    $parallel = new Parallel(10);
    for ($i = 0; $i < 20; $i++) {
        $parallel->add(function () {
            sleep(1);
            return Coroutine::id();
        });
    }
    try{
        $results = $parallel->wait();
        print_r($results);
    } catch(ParallelExecutionException $e){
        // $e->getResults() 获取协程中的返回值。
        // $e->getThrowables() 获取协程中出现的异常。
    }
}
Concurrent
Hyperf\Utils\Coroutine\Concurrent 基于 Swoole\Coroutine\Channel 实现,用来控制一个代码块内同时运行的最大协程数量的特性。
#[GetMapping('/co/add')]
public function add()
{
    $concurrent = new Coroutine\Concurrent(10);
    for ($i = 0; $i < 15; ++$i) {
        $concurrent->create(function () {
            sleep(1);
            echo Coroutine::id() . '==' . mt_rand() . PHP_EOL;
        });
    }
}
注意输出结果,是分 2 次输出。
协程上下文
use Hyperf\Context\Context;
#[GetMapping('/co/add')]
public function add()
{
    go(function () {
        Context::set('name', '王美丽');
        go(function () {
            // 不会输出 name 的值 
            echo Context::get('name');
            echo Coroutine::id() . PHP_EOL;
        });
        // 输出当前协程中 name 的值
        echo Coroutine::id() . ' ' . Context::get('name');
    });
}
下以是相关方法:
set(string $id, $value)
含义:储存一个值到当前协程的上下文中。
get(string $id, $default = null)
含义:当前协程的上下文中取出一个以 $id 为 key 储存的值。
has(string $id)
含义:判断当前协程的上下文中是否存在以 $id 为 key 储存的值,如存在则返回 true,不存在则返回 false。
override()
当我们需要做一些复杂的上下文处理,比如先判断一个 key 是否存在,如果存在则取出 value 来再对 value 进行某些修改,然后再将 value 设置回上下文容器中,此时会有比较繁杂的判断条件,可直接通过调用 override 方法来实现这个逻辑。
本篇文章结束,我是温新。