Swoole Process多进程实现TCP服务器

作者: 温新

分类: 【高性能PHP】

阅读: 1628

时间: 2021-04-18 13:30:19

PHP可以使用自带的pcntl实现多进程,这里有个简单的小案例,不妨看看,PHP多进程的使用。

下面模拟一个TCP服务器。设置3个子进程,主进程启动之后会启动3个子进程用于处理客户端连接及请求操作。当子进程退出之后,主进程会重新创建子进程,若主进程退出,则子进程在处理完当前请求后退出。

Swoole版本为4.6.4

一、创建多进程TCP服务器

文件:TcpServer.php

<?php
namespace Swoole;

class TcpServer
{
    // 主进程ID
    private $mpid;
    // 子进程
    private $pids = [];
    // 网络套接字
    private $socket;

    // 创建的最大进程数
    const MAX_PROCESS = 3;

    /**
     * 服务器主进程业务逻辑
     */
    public function run()
    {
        // 主进程
        $process = new Process(function () {
            /**
            *posix_getpid() PHP函数。返回当前进程ID
            */
            // 将当前进程作为主进程ID
            $this->mpid = posix_getpid();
            echo time() . " 主进程ID {$this->mpid}\n";

            /**
            *stream_socket_server() PHP函数。创建一个套接字
            */
            // 创建 TCP 服务器并获取套接字
            $this->socket = stream_socket_server("tcp://192.168.172.130:9501", $errno, $errstr);
            if (!$this->socket) {
                exit("Server start error: $errstr --- $errno");
            }

            // 启动子进程处理请求
            for ($i = 0; $i < self::MAX_PROCESS; $i++) {
                $this->startWorkerProcess();
            }

            echo "等待客户端连接...\n";

            // 主进程等待子进程退出,必须是死循环
            while (1) {
                foreach ($this->pids as $k => $pid) {
                    if ($pid) {
                        // 回收结束运行的子进程,以避免僵尸进程出现
                        $ret = Process::wait(false);
                        if ($ret) {
                            echo time() . "工作进程 $pid 退出, 将启动... \n";
                            // 子进程退出后重新启动一个新的子进程
                            $this->startWorkerProcess();
                            unset($this->pids[$k]);
                        }
                    }
                }
                sleep(1); //让出 1s 时间给CPU
            }
        }, false, false); //不启用管道通信
        // 让当前进程变成一个守护进程
        Process::daemon();
        // 执行 fork 系统调用,启动进程
        // 注意:start 之后的变量子进程里面是获取不到的
        $process->start();
    }

    // 创建子进程,接收客户端连接并处理
    private function startWorkerProcess()
    {
        // 子进程
        $process = new Process(function (Process $worker) {
            // 子进程业务逻辑
            $this->acceptClient($worker);
        }, false, false);
        // 启动子进程并获取子进程 ID
        $pid = $process->start();
        $this->pids[] = $pid;
    }

    // 等待客户端连接并处理
    private function acceptClient(&$worker)
    {
        //子进程一直等待客户端连接,不能退出
        while (1) {
            // 从主进程创建的网络套接字上获取连接
            $conn = stream_socket_accept($this->socket, -1);
            // 如果定义了连接建立回调函数,则在连接上执行该回调
            if ($this->onConnect) {
                call_user_func($this->onConnect, $conn);
            }

            // 开始循环读取客户端请求消息
            $recv = ''; // 实际收到的消息
            $buffer = ''; // 缓冲消息
            while (1) {
                // 检查主进程是否正常,不正常则退出子进程
                $this->checkMpid($worker);
                // 读取客户端请求消息
                $buffer = fread($conn, 20);

                // 没有收到正常消息
                if ($buffer === false || $buffer === '') {
                    // 如果服务器设置了连接关闭回调函数,则在当前连接上执行该回调
                    if ($this->onClose) {
                        call_user_func($this->onClose, $conn);
                    }
                    // 结束读取消息,退出当前循环,等待下一个客户端连接
                    break;
                }

                // 消息结束符的位置
                $pos = strpos($buffer, "\n");
                if ($pos === false) {  // 没有读取完,继续读取
                    $recv .= $buffer;
                } else {  // 读取完毕,开始处理请求消息
                    // 处理收到的消息
                    $recv .= trim(substr($buffer, 0, $pos + 1));

                    // 如果服务器定义了消息处理回调函数,则在当前连接上将消息传入回调函数并执行该回调
                    if ($this->onMessage) {
                        call_user_func($this->onMessage, $conn, $recv);
                    }

                    // 如果接收到 quit 消息,表示关闭此连接,等待下一个客户端连接
                    if ($recv == "quit") {
                        echo "Client close connection\n";
                        fclose($conn);
                        break;
                    }

                    $recv = ''; // 清空消息,准备下一次接收
                }
            }
        }
    }

    /**
     * 如果主进程已退出,则子进程也退出,避免孤儿进程出现
     * @param Process $worker
     */
    public function checkMpid(&$worker)
    {
        // 检测主进程是否存在,如果不存在,则退出子进程
        if (!Process::kill($this->mpid, 0)) {
            $worker->exit();
            // 这句提示,实际是看不到的,需要写到日志中
            echo "Master process exited, I [{$worker['pid']}] also quit\n";
        }
    }
}

$server = new TcpServer();

// 定义连接建立回调函数
$server->onConnect = function ($conn) {
    echo "onConnect -- accepted " . stream_socket_get_name($conn, true) . "\n";
};

// 定义收到消息回调函数
$server->onMessage = function ($conn, $msg) {
    echo "onMessage --" . $msg . "\n";
    fwrite($conn, "received " . $msg . "\n");
};

// 定义连接关闭回调函数
$server->onClose = function ($conn) {
    echo "onClose --" . stream_socket_get_name($conn, true) . "\n";
};

// 启动服务器主进程
$server->run();

二、创建协程客户端

关于一些资料,写的时候可能比较早,而swoole是发展的,方法可能会做调整,但也适用。尽管可以适用,但是最好还是与文档保持一致。关于swoole学习多翻阅文档是个好习惯。

文件:TcpClient.php

<?php

use Swoole\Coroutine\Client;
use function Swoole\Coroutine\run;

run(function(){
    $client = new Client(SWOOLE_SOCK_TCP);
    // 连接指定的TCP服务器
    if ($client->connect('192.168.172.130',9501,0.5)) {
        echo "connect failed. Error: {$client->errCode}\n";
    }

    // 建立连接成功后发送消息
    $client->send("hello world\n");
    // 打印接收到的消息
    echo $client->recv() . PHP_EOL;
    sleep(3);

    // 关闭连接
    $client->close();
});

三、启动TCP服务器

启动TCP服务器

php TcpServer.php

查看TCP服务器

ps aux | grep php
root        938  0.0  1.2 129164 12412 ?        Ss   19:26   0:00 php-fpm: master process (/usr/local/php-8.0.1/etc/php-fpm.conf)
www         951  0.0  0.6 129164  6636 ?        S    19:26   0:00 php-fpm: pool www
www         952  0.0  0.6 129164  6640 ?        S    19:26   0:00 php-fpm: pool www
root       2969  0.0  0.7 129044  7112 ?        S    20:46   0:00 php TcpServer.php
root       2970  0.0  0.6 131096  6468 ?        S    20:46   0:00 php TcpServer.php
root       2971  0.0  0.6 131096  6468 ?        S    20:46   0:00 php TcpServer.php
root       2972  0.0  0.6 131096  6960 ?        S    20:46   0:00 php TcpServer.php

可以看到,主进程ID为2969,主进程启动后创建了3个子进程。

四、客户端连接

客户端界面

开启两个窗口进行连接TCP服务器。

窗口一:

php TcpClient.php 
connect failed. Error: 0
received hello world

窗口二:

php TcpClient.php 
connect failed. Error: 0
received hello world

服务器界面窗口

 1618749993 Master process, pid 2969
Waiting client start...
onConnect -- accepted 192.168.172.130:50060
onMessage --hello world
onClose --192.168.172.130:50060
onConnect -- accepted 192.168.172.130:50062
onMessage --hello world
onClose --192.168.172.130:50062

此文学习自 学院君;客户端代码来自于Swoole官方文档。

2021-04-18

请登录后再评论