二十、Swoole 基础学习笔记 - Swoole WebSocket 消息通知及多端口复合协议的使用

作者: 温新

分类: 【Swoole 系列】

阅读: 1484

时间: 2023-03-13 11:43:14

hi,我是温新,一名PHPer

文章基于 Swoole 5.0.1 版本编写。

**学习目标:通过案例运用 websocket **

说明:本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

消息通知案例

功能需求:当用户回复了帖子,就通知发帖的人有新的评论了。以 A、B 用户为例,A 用户是发帖的人,B 用户是评论的人,B 用户评论 A 用户的帖子,就发送消息给 A 用户。

功能分析:

  • 无法保证 A、B 用户都处于在线状态。通过情况发表评论的人在线,也就是 B 用户。A 用户则不一定在线;
  • 一个用户可能对应多个客户端(也就是打开了多个 tab 页),这里把最新的当作有效的 fd;
  • 以参数的形式来说明用户

A 用户:http://localhost/swoole/19-swoole-websocket-clint-1.php?id=1

B 用户:http://localhost/swoole/19-swoole-websocket-clint-1.php?id=2

服务端代码

<?php
// 19-swoole-websocket-server.php
    
class WebSocketServer
{
	private $server = null;
	public $key     = 'ziruchu';
	/**
	 * [
	 * 	用户 ID => fd 
	 * ]
	 * 
	 */
	public $userFds = [];


	public function __construct()
	{
		$this->server = new Swoole\WebSocket\Server('0.0.0.0', 9501);

		$this->server->set([
			'heartbeat_idle_time'      => 60,
			'heartbeat_check_interval' => 10,
		]);

		$this->server->on('Open', [$this, 'onOpen']);
		$this->server->on('Message', [$this, 'onMessage']);
		$this->server->on('Close', [$this, 'onClose']);
	}


	public function onOpen($server, $request)
	{
		// 验证客户端连接
		$accessResult = $this->checkAccess($server, $request);
		if (!$accessResult) {
			return false;
		}

		// 客户度连接加入数组
		if (array_key_exists($request->get['id'], $this->userFds)) {
			$userFd = $this->userFds[$request->get['id']];
			$this->close($userFd, '用户已存在');
			$this->userFds[$request->get['id']] = $request->fd;
			return false;
		} else {
			$this->userFds[$request->get['id']] = $request->fd;
		}
	}

	public function onMessage($server, $frame)
	{
		// {"event":"alertTip", "id": 10}
		$data = json_decode($frame->data, true);

		if (!$data || !is_array($data) || empty($data['event'])) {
			$this->close($frame->fd, '数据格式错误');
			return false;
		}

		$method = $data['event'];
		if (!method_exists($this, $method)) {
			$this->close($frame->fd, '方法不存在');
			return false;
		}

		$this->$method($frame->fd, $data);
	}

	public function onClose($server, $fd)
	{
		echo '关闭连接: ' . $fd . PHP_EOL;
	}

	public function start()
	{
		$this->server->start();
	}

	public function checkAccess($server, $request)
	{
		if (!isset($request->get) || !isset($request->get['id']) || !isset($request->get['token'])) {
			$this->close($request->fd, '访问失败');
			return false;
		}


		$id    = $request->get['id'];
		$token = $request->get['token'];


		if (md5(md5($id). $this->key ) != $token) {
			$this->close($request->fd, 'token 验证失败');
			return false;
		}


		return true;
	}

	/**
	 * 消息提示
	 * 
	 * @param  int $fd   客户端连接标识
	 * @param  array $data 数据
	 * @return [type]       [description]
	 */
	public function alertTip($fd, $data)
	{
		if (empty($data['id']) || !array_key_exists($data['id'], $this->userFds)) {
			return false;
		}

		$userData = [
			'event'   =>	$data['event'],
			'message' => '你有新的回复,注意查看',
		];
		// 发送数据
		$this->push($this->userFds[$data['id']], $userData);
	}

	public function push($fd, $message)
	{
		if (!is_array($message)) {
			$message = [$message];
		}
		$message = json_encode($message);
		// 发送数据
		if ($this->server->push($fd, $message) == false) {
			$this->close($fd);
		}
	}

	/**
	 * 关闭连接
	 * 
	 * @param  int $fd 客户端标识
	 * @param  string $message 消息
	 * @return [type]          [description]
	 */
	public function close($fd, $message = '')
	{
		$this->server->close($fd);
		if ($id = array_search($fd, $this->userFds)) {
			unset($this->userFds[$id]);
		}
	}
}

$server = new WebSocketServer();
$server->start();

客户端代码

<!-- 19-swoole-websocket-clint-1.php -->

<!DOCTYPE html>
<html lang="en">
<head>
	<meta charset="UTF-8">
	<meta name="viewport" content="width=device-width, initial-scale=1.0">
	<title>简版聊天</title>
</head>
<body>
	<?php
		$key = 'ziruchu';
		$id = isset($_GET['id']) ? intval($_GET['id']) : 0;
		$token = md5(md5($id) . $key);
	?>

	<div>
		发送内容:<textarea name="content" id="content" cols="30" rows="10"></textarea><br>
		<!-- 用户 ID -->
		发送给谁:<input type="text" name="id" value="" id="userId"><br>
		<button onclick="send();">发送</button>
	</div>

	<script>
		let ws = new WebSocket("ws://127.0.0.1:9501?id=<?php echo $id ?>&token=<?php echo $token; ?>");
		ws.onopen = function(event) {
		}

		ws.onmessage = function (event) {
			let data = event.data;
			data = eval("("+data+")");
			if (data.event == 'alertTip') {
				alert(data.message);
			}
		}

	    ws.onclose = function(event) {
	        console.log('客户端关闭连接');
	    };

	    function send() {
	        let obj = document.getElementById('content');
	        let content = obj.value;
	        let userId = document.getElementById('userId').value;
	        ws.send('{"event":"alertTip", "id": '+userId+'}');
	    }
	</script>

</body>
</html>

代码说明:

  • 1、服务端 $userFds 属性用于保存 用户 ID 与 fd 标识的映射;
  • 2、open 回调,用于处理验证与添加映射关系;
  • 3、message 回调,用于发送消息;
  • 4、封装的 close 方法用于关闭连接,push 方法用于发送信息。

多端口使用

回顾之前所学的知识,无论是 tcp server、http server等,都是独立的 server 服务,server 与 server 之间没有关联。

现在对上面的需求进行改造,评论审核通过知再发送通知。学习 tcp server 时,若要让 web 应用同 server 服务通信,可以使用 tcp client。现在在 websocket 服务中,加入一个 tcp server 来实现评论审核通过后发送通知。

服务端代码

<?php
// 19-swoole-websocket-server-2.php
class WebSocketServer
{
	private $server = null;
	public $key     = 'ziruchu';
	/**
	 * [
	 * 	用户 ID => fd 
	 * ]
	 * 
	 */
	public $userFds = [];
	private $tcpServer = null;


	public function __construct()
	{
		$this->server = new Swoole\WebSocket\Server('0.0.0.0', 9501);

		$this->server->set([
			'heartbeat_idle_time'      => 60,
			'heartbeat_check_interval' => 10,
		]);

		$this->tcpServer = $this->server->listen('0.0.0.0', 9502, SWOOLE_SOCK_TCP);
		$this->tcpServer->set([
			'open_eof_split' => true,
			'open_eof_check' => true,
			'package_eof'    => "\r\n",
		]);
		$this->tcpServer->on('Receive', [$this, 'onReceive']);

		$this->server->on('Open', [$this, 'onOpen']);
		$this->server->on('Message', [$this, 'onMessage']);
		$this->server->on('Close', [$this, 'onClose']);

	}


	public function onOpen($server, $request)
	{
		// 验证客户端连接
		$accessResult = $this->checkAccess($server, $request);
		if (!$accessResult) {
			return false;
		}

		// 客户度连接加入数组
		if (array_key_exists($request->get['id'], $this->userFds)) {
			$userFd = $this->userFds[$request->get['id']];
			$this->close($userFd, '用户已存在');
			$this->userFds[$request->get['id']] = $request->fd;
			return false;
		} else {
			$this->userFds[$request->get['id']] = $request->fd;
		}
	}

	public function onMessage($server, $frame)
	{
		// {"event":"alertTip", "id": 10}
		$data = json_decode($frame->data, true);

		if (!$data || !is_array($data) || empty($data['event'])) {
			$this->close($frame->fd, '数据格式错误');
			return false;
		}

		$method = $data['event'];
		if (!method_exists($this, $method)) {
			$this->close($frame->fd, '方法不存在');
			return false;
		}

		$this->$method($frame->fd, $data);
	}

	public function onClose($server, $fd)
	{
		echo '关闭连接: ' . $fd . PHP_EOL;
	}

	public function start()
	{
		$this->server->start();
	}

	public function checkAccess($server, $request)
	{
		if (!isset($request->get) || !isset($request->get['id']) || !isset($request->get['token'])) {
			$this->close($request->fd, '访问失败');
			return false;
		}


		$id    = $request->get['id'];
		$token = $request->get['token'];


		if (md5(md5($id). $this->key ) != $token) {
			$this->close($request->fd, 'token 验证失败');
			return false;
		}


		return true;
	}

	/**
	 * 消息提示
	 * 
	 * @param  int $fd   客户端连接标识
	 * @param  array $data 数据
	 * @return [type]       [description]
	 */
	public function alertTip($fd, $data)
	{
		if (empty($data['id']) || !array_key_exists($data['id'], $this->userFds)) {
			return false;
		}

		$userData = [
			'event'   =>	$data['event'],
			'message' => '你有新的回复,注意查看',
		];
		// 发送数据
		$this->push($this->userFds[$data['id']], $userData);
	}

	public function push($fd, $message)
	{
		if (!is_array($message)) {
			$message = [$message];
		}
		$message = json_encode($message);
		// 发送数据
		if ($this->server->push($fd, $message) == false) {
			$this->close($fd);
		}
	}

	/**
	 * 关闭连接
	 * 
	 * @param  int $fd 客户端标识
	 * @param  string $message 消息
	 * @return [type]          [description]
	 */
	public function close($fd, $message = '')
	{
		$this->server->close($fd);
		if ($id = array_search($fd, $this->userFds)) {
			unset($this->userFds[$id]);
		}
	}

	public function onReceive($tcpServer, $fd, $fromId, $data)
	{
		try {
			$data = json_decode($data, true);
			if (!isset($data['event'])) {
				throw new Exception('参数错误', 1);
			}

			$method = $data['event'];
			if (!method_exists($this, $method)) {
				throw new Exception('没有该方法');
			}

			$this->$method($fd, $data);

			return true;
		} catch (Exception $e) {
			throw new Exception($e->getMessage(), 1);
		}
	}
}

$server = new WebSocketServer();
$server->start();

客户端代码

<?php
// 19-swoole-tcp-client.php
class Client
{
	private $client = null;

	public function __construct()
	{
		$this->client = new Swoole\Client(SWOOLE_SOCK_TCP);

		if (!$this->client->connect('127.0.0.1', 9502)) {
			throw new Exception('连接失败');
		}
	}

	public function sendData($data)
	{
		$data = $this->togetherDataByEof($data);
		$this->client->send($data);
	}

	public function togetherDataByEof($data)
	{
		if (!is_array($data)) {
			return false;
		}

		return json_encode($data) . "\r\n";
	}
}

$client = new Client;
$client->sendData([
	'event' => 'alertTip',
	'id' => 1,
]);

websocket 代码任然使用 19-swoole-websocket-client-1.php

浏览器中访问:

http://localhost/swoole/19-swoole-websocket-clint-1.php?id=1
http://localhost/swoole/19-swoole-websocket-clint-1.php?id=2
http://localhost/swoole/19-swoole-tcp-client.php

本篇文章到此结束,我是下篇文章继续学习。

请登录后再评论