在阻塞应用程序中使用反应式 PHP

Using reactive PHP in a blocking application

我目前正在开发一个 PHP 应用程序,该应用程序将使用一些 websocket 连接与其他服务通信。

为了与此 websocket 服务通信,我们使用 Ratchet - 这是一个基于 React PHP.

的 PHP 库

这段代码需要发送和响应几个请求,之后,应该return将信息发送到"main thread"。

示例流程:

HTTP 请求 -> 控制器 -> 启动打开 websocket 客户端的服务 -> websocket 客户端正在与服务器通信 -> 一旦完成,它应该 return 控制器代码的结果 -> 控制器输出给用户

我遇到的问题是我不熟悉 Reactive PHP 并且不确定如何处理。

我试过了;

    $service = new WebsocketService();
    $startTimer = time();
    $service->getList(44);
    while($service->getResponse() == null) {
        usleep(500);
        if (time() > $startTimer + 10) {
            continue; //Timeout on 10 seconds
        }
    }
    var_dump($service->getResponse());

服务代码会在完成后将其 "response" 变量设置为 null 以外的值。这显然失败了,因为 sleep 方法阻塞了线程。同样没有,似乎 while 循环正在阻塞 I/O 并且反应代码失败。

一个解决方案是打开一个新线程和 运行 那里的 websocket 代码,但我不会对此感到满意。

我觉得我需要围绕 websocket 进程实施某种 "watcher",但我不确定该怎么做。

我们的 Websocket 服务客户端代码如下所示;

private $response = null;

/**
 * @return null|object
 */
public function getResponse() {
    return $this->response;
}

public function getList($accountId) {
    $this->response = null;
    \Ratchet\Client\connect('ws://192.168.56.1:8080')->then(function(\Ratchet\Client\WebSocket $conn) use ($accountId) {
        $login = new \stdClass();
        $login->action = 'login';
        $conn->on('message', function($msg) use ($conn, $login, $accountId) {
            try {
                $response = json_decode($msg);
                if ($response->result_id == 100) {
                    //Succesfully logged in to websocket server

                    //Do our request now.
                    $message = new \stdClass();
                    $message->target = 'test';
                    $conn->send(json_encode($message));
                }

                if (isset($response->reply) && $response->reply == 'list') {
                    $this->response = $response; //This is the content I need returned in the controller
                    $conn->close(); //Dont need it anymore
                }

            } catch (\Exception $e) {
                echo 'response exception!';
                //Do nothing for now
            }
        });

        $conn->send(json_encode($login));
    }, function ($e) {
        echo "Could not connect: {$e->getMessage()}\n";
    });
}

运行这样的代码也不行;

    $service = new WebsocketService();
    $service->getList(44);
    echo 'Test';
    var_dump($service->getResponse());

因为 "test" 回声在我收到来自 websocket 服务器的响应之前就出现了。

请赐教!我不确定要搜索什么。

您是否正在使用 WsServer 扩展 class,如果您遇到致命错误,这可能是个问题。我不确定您是否收到致命错误或警告。我还注意到 public 函数 onOpen() 打开了一个连接。请尝试参考此文档 http://socketo.me/api/class-Ratchet.WebSocket.WsServer.html 可能会有用。

PHP 和 websockets 似乎仍处于试验阶段。不过,我在 medium.com 上找到了一篇很棒的教程,由 Adam Winnipass 撰写,这对解决您的问题应该很有帮助:https://medium.com/@winni4eva/php-websockets-with-ratchet-5e76bacd7548

唯一的区别是他们使用 JavaScript 而不是 PHP 来实现他们的 websocket 客户端。但最后应该没有太大区别,因为一旦我们打开每一端的 Websocket 连接,两个应用程序都必须发送并等待接收通知 - 他们是这样说明的:

似乎创建成功的 Websocket 连接的一种可能性是扩展 MessageComponentInterface

use Ratchet\MessageComponentInterface;

这还需要

use Ratchet\ConnectionInterface;

消息组件接口定义了以下方法:

  • 打开
  • onMessage
  • 关闭
  • onError

而且我认为这就是 Ratchet 库实现它的方式。这就是他们最终启动服务器的方式:

use Ratchet\Server\IoServer;
use MyApp\MyCustomMessageComponentInterface;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
require dirname(__DIR__) . '/vendor/autoload.php';
$server = IoServer::factory(
            new HttpServer(
                new WsServer(
                    new MyCustomMessageComponentInterface()
                )
            ),
          8080
         );
$server->run();

使用此架构,您已经可以接收 (onMessage),也可以使用 send() 方法发送。

我无法用您现有的代码解决确切的问题。但我想如果你按预期使用pre-built类和库的接口[=52​​=](并在此处演示)你应该能够实现你想要的将您的代码添加到相应的方法中。

可以在文档中找到更多信息和示例:

http://socketo.me/docs/server

http://socketo.me/api/namespace-Ratchet.html