PHP/Ratchet websocket - while 循环问题

PHP/Ratchet websocket - issues with while loop

我有一个使用 PHP 和 Ratchet 库的非常简单的 websocket。

当用户打开特定页面时,它会将用户 ID 发送到我的套接字,它应该更新该用户的状态(目前我只是在控制台中记录它),如下所示:

<input type="hidden" value="'.$account_id.'" id="account_id">
<input type="hidden" value="trial" id="request_type">
<script>
$(document).ready(function(){
    var conn = new WebSocket('ws://127.0.0.1:8080');

    conn.onopen = function(e){
        console.log("Connection Opened!");
        var account_id = $("#account_id").val();
        var request_type = $("#request_type").val();
        var data = {account_id: account_id, request_type: request_type};
        conn.send(JSON.stringify(data));
    }
    conn.onclose = function(e){
        console.log("Connection Closed!");
    }
    conn.onmessage = function(e) {
        var data = JSON.parse(e.data);
        console.log(data);
    };
    conn.onerror = function(e){
        var data = JSON.parse(e.data);
        console.log(data);
    }
})
</script>

那么我的socket脚本如下:

set_time_limit(0);

use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
require dirname(__DIR__) . '../vendor/autoload.php';

class socket implements MessageComponentInterface{
    protected $clients;

    public function __construct(){
        $this->clients = new \SplObjectStorage;
        echo 'Server Started.'.PHP_EOL;
    }

    public function onOpen(ConnectionInterface $socket){
        $this->clients->attach($socket);
        echo 'New connection '.$socket->resourceId.'!'.PHP_EOL;
    }
    public function onClose(ConnectionInterface $socket) {
        $this->clients->detach($socket);
        echo 'Connection '.$socket->resourceId.' has disconnected'.PHP_EOL;
    }
    public function onError(ConnectionInterface $socket, \Exception $e) {
        echo 'An error has occurred: '.$e->getMessage().'!'.PHP_EOL;
        $socket->close();
    }
    public function onMessage(ConnectionInterface $from, $json){
        echo 'Connection '.$from->resourceId.' sent '.$json.PHP_EOL;
        $data = json_decode($json, true);
        $account_id = $data['account_id'];
        $request_type = $data['request_type'];

        try {
            $conn = new PDO("mysql:host=".$db_host.";port:".$db_port.";dbname=".$db_name."", $db_user, $db_pass);
            $conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        }catch(PDOException $e){
            echo $e->getMessage();
        }
        
        foreach ($this->clients as $client) {
            if ($from->resourceId == $client->resourceId) {
                if($request_type == 'trial'){
                    // while(true){
                        $response_array= [];
                        $stmt = $conn->prepare("SELECT * FROM table WHERE account_id=:account_id AND last_status_change=now()");
                        $stmt->bindParam(':account_id', $account_id);
                        $stmt->execute();
                        $result = $stmt->setFetchMode(PDO::FETCH_ASSOC);
                        foreach($stmt->fetchAll() as $key=>$value) {
                            $response_array[$key] = $value;
                        }
                        if(!empty($response_array)){
                            foreach($response_array as $item){
                                $status = $item['status'];
                            }
                            $response = array(
                                'account_id' => $account_id,
                                'status' => $status
                            );
                            var_dump($response);
                            $client->send(json_encode($response));
                        }
                        // sleep(5);
                    // }
                }
            }
        }
    }
}

$server = IoServer::factory(
    new HttpServer(
        new WsServer(
            new socket()
        )
    ),
    8080
);
$server->run();

就目前而言,它按预期工作,但只有在页面加载时状态发生变化时才会显示当前状态,我会在控制台中看到状态,只要我取消注释 while() 循环实际继续检查更新状态,当状态发生变化但客户端没有任何记录时,我的套接字将在命令行中执行结果的 var_dump()

我是 websockets 的新手,我一直在通过在 JS 中有一个间隔来进行长轮询,该间隔将 fetch() 发送到 PHP 脚本,该脚本获得了最新的数据库结果,但它不是它非常高效,当大量客户端处于活动状态并不断向文件发出请求时会导致问题,这反过来又会减慢数据库的速度。所以我不确定为什么 while() 循环会像这样影响它,或者我是否以正确的方式进行此操作。

将此行 if ($from->resourceId == $client->resourceId) { 替换为 if ($from == $client) { 此更改可能看起来很简单,但在 php ratchet 提供的聊天示例 class 中,以避免将消息发送到sender 他们有条件向除了sender之外的客户端发送消息,他们这样比较if ($from == $client) {不仅是一个resourceId整个对象本身!

while 循环不是它的工作原理。它会阻塞东西并无限且不必要地消耗资源。

你要的是addPeriodicTimer().

定期检查需要更新的客户端。

像这样添加到你的引导程序中:

$reactEventLoop->addPeriodicTimer(5, function() use $messageHandler, $server {
    // Fetch all changed clients at once and update their status
    $clientsToUpdate = getUpdatedClients($server->app->clients);
    foreach ($clientsToUpdate as $client) {
        $client->send(json_encode($response));
    }
});

这比任何其他方法都要轻量级得多,您可以做到

  1. 使用单个准备好的数据库查询获取 N 个客户状态
  2. 仅定期更新更改的客户端
  3. 不要将您的应用置于阻塞状态

Whosebug 上的其他资源将帮助您找到合适的位置:

How do I access the ratchet php periodic loop and client sending inside app?

您应该使用 Ratchet 中的 addPeriodicTimer,尽管您必须制作 $clients public 才能放置计时器。 也许你可以将它放在 class 中并且仍然是私有的,但我不确定它是否可以为每个客户端启动一个计时器。

无论如何,如您所见,您可以创建另一个 public 函数,它实际上会在周期性计时器中完成工作(就像 while 循环一样) 然后在客户端连接后调用它,并在 timerloop 中多次调用它, 为此,我还创建了一个 public account_ids 来保存帐户 ID

试一试,告诉我

use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
require dirname(__DIR__) . '../vendor/autoload.php';

class socket implements MessageComponentInterface{
    public $clients;
    public $account_ids;

    public function __construct(){
        $this->clients = new \SplObjectStorage;
        echo 'Server Started.'.PHP_EOL;
    }

    public function onOpen(ConnectionInterface $socket){
        $this->clients->attach($socket);
        echo 'New connection '.$socket->resourceId.'!'.PHP_EOL;
    }
    public function onClose(ConnectionInterface $socket) {
        $this->clients->detach($socket);
        echo 'Connection '.$socket->resourceId.' has disconnected'.PHP_EOL;
    }
    public function onError(ConnectionInterface $socket, \Exception $e) {
        echo 'An error has occurred: '.$e->getMessage().'!'.PHP_EOL;
        $socket->close();
    }
    public function onMessage(ConnectionInterface $from, $json){
        echo 'Connection '.$from->resourceId.' sent '.$json.PHP_EOL;
        $data = json_decode($json, true);
        $account_id = $data['account_id'];
        $request_type = $data['request_type'];
        foreach ( $this->clients as $client ) {
            if ( $from->resourceId == $client->resourceId ) {
                if( $request_type == 'trial'){
                    $this->account_ids[$client->resourceId] = $account_id;
                    $this->checkStatus($client, $account_id);
                }
            }
        }
    }
    public function checkStatus($client, $account_id){
        try {
            $conn = new PDO("mysql:host=".$db_host.";port:".$db_port.";dbname=".$db_name."", $db_user, $db_pass);
            $conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        }catch(PDOException $e){
            echo $e->getMessage();
        }
        $response_array= [];
        $stmt = $conn->prepare("SELECT * FROM table WHERE account_id=:account_id AND last_status_change=now()");
        $stmt->bindParam(':account_id', $account_id);
        $stmt->execute();
        $result = $stmt->setFetchMode(PDO::FETCH_ASSOC);
        foreach($stmt->fetchAll() as $key=>$value) {
            $response_array[$key] = $value;
        }
        if ( !empty($response_array) ) {
            foreach($response_array as $item){
                $status = $item['status'];
            }
            $response = array(
                'account_id' => $account_id,
                'status' => $status
            );
            var_dump($response);
            $client->send(json_encode($response));
        }
    }
}

$socket = new socket();
$server = IoServer::factory(
    new HttpServer(
        new WsServer(
            $socket
        )
    ),
    8080
);
$server->loop->addPeriodicTimer(5, function () use ($socket) {
    foreach($socket->clients as $client) {
        echo "Connection ".$client->resourceId." check\n";
        $socket->checkStatus($client, $socket->account_ids[$client->resourceId]);
    }
});


$server->run();