在阻塞应用程序中使用反应式 PHP
Using reactive PHP in a blocking application
我目前正在开发一个 PHP 应用程序,该应用程序将使用一些 websocket 连接与其他服务通信。
为了与此 websocket 服务通信,我们使用 Ratchet - 这是一个基于 React PHP.
的 PHP 库
这段代码需要发送和响应几个请求,之后,应该return将信息发送到"main thread"。
示例流程:
HTTP 请求 -> 控制器 -> 启动打开 websocket 客户端的服务 -> websocket 客户端正在与服务器通信 -> 一旦完成,它应该 return 控制器代码的结果 -> 控制器输出给用户
我遇到的问题是我不熟悉 Reactive PHP 并且不确定如何处理。
我试过了;
$service = new WebsocketService();
$startTimer = time();
$service->getList(44);
while($service->getResponse() == null) {
usleep(500);
if (time() > $startTimer + 10) {
continue; //Timeout on 10 seconds
}
}
var_dump($service->getResponse());
服务代码会在完成后将其 "response" 变量设置为 null 以外的值。这显然失败了,因为 sleep 方法阻塞了线程。同样没有,似乎 while 循环正在阻塞 I/O 并且反应代码失败。
一个解决方案是打开一个新线程和 运行 那里的 websocket 代码,但我不会对此感到满意。
我觉得我需要围绕 websocket 进程实施某种 "watcher",但我不确定该怎么做。
我们的 Websocket 服务客户端代码如下所示;
private $response = null;
/**
* @return null|object
*/
public function getResponse() {
return $this->response;
}
public function getList($accountId) {
$this->response = null;
\Ratchet\Client\connect('ws://192.168.56.1:8080')->then(function(\Ratchet\Client\WebSocket $conn) use ($accountId) {
$login = new \stdClass();
$login->action = 'login';
$conn->on('message', function($msg) use ($conn, $login, $accountId) {
try {
$response = json_decode($msg);
if ($response->result_id == 100) {
//Succesfully logged in to websocket server
//Do our request now.
$message = new \stdClass();
$message->target = 'test';
$conn->send(json_encode($message));
}
if (isset($response->reply) && $response->reply == 'list') {
$this->response = $response; //This is the content I need returned in the controller
$conn->close(); //Dont need it anymore
}
} catch (\Exception $e) {
echo 'response exception!';
//Do nothing for now
}
});
$conn->send(json_encode($login));
}, function ($e) {
echo "Could not connect: {$e->getMessage()}\n";
});
}
运行这样的代码也不行;
$service = new WebsocketService();
$service->getList(44);
echo 'Test';
var_dump($service->getResponse());
因为 "test" 回声在我收到来自 websocket 服务器的响应之前就出现了。
请赐教!我不确定要搜索什么。
您是否正在使用 WsServer 扩展 class,如果您遇到致命错误,这可能是个问题。我不确定您是否收到致命错误或警告。我还注意到 public 函数 onOpen() 打开了一个连接。请尝试参考此文档 http://socketo.me/api/class-Ratchet.WebSocket.WsServer.html 可能会有用。
PHP 和 websockets 似乎仍处于试验阶段。不过,我在 medium.com 上找到了一篇很棒的教程,由 Adam Winnipass 撰写,这对解决您的问题应该很有帮助:https://medium.com/@winni4eva/php-websockets-with-ratchet-5e76bacd7548
唯一的区别是他们使用 JavaScript 而不是 PHP 来实现他们的 websocket 客户端。但最后应该没有太大区别,因为一旦我们打开每一端的 Websocket 连接,两个应用程序都必须发送并等待接收通知 - 他们是这样说明的:
似乎创建成功的 Websocket 连接的一种可能性是扩展 MessageComponentInterface
use Ratchet\MessageComponentInterface;
这还需要
use Ratchet\ConnectionInterface;
消息组件接口定义了以下方法:
- 打开
- onMessage
- 关闭
- onError
而且我认为这就是 Ratchet 库实现它的方式。这就是他们最终启动服务器的方式:
use Ratchet\Server\IoServer;
use MyApp\MyCustomMessageComponentInterface;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
require dirname(__DIR__) . '/vendor/autoload.php';
$server = IoServer::factory(
new HttpServer(
new WsServer(
new MyCustomMessageComponentInterface()
)
),
8080
);
$server->run();
使用此架构,您已经可以接收 (onMessage),也可以使用 send() 方法发送。
我无法用您现有的代码解决确切的问题。但我想如果你按预期使用pre-built类和库的接口[=52=](并在此处演示)你应该能够实现你想要的将您的代码添加到相应的方法中。
可以在文档中找到更多信息和示例:
我目前正在开发一个 PHP 应用程序,该应用程序将使用一些 websocket 连接与其他服务通信。
为了与此 websocket 服务通信,我们使用 Ratchet - 这是一个基于 React PHP.
的 PHP 库这段代码需要发送和响应几个请求,之后,应该return将信息发送到"main thread"。
示例流程:
HTTP 请求 -> 控制器 -> 启动打开 websocket 客户端的服务 -> websocket 客户端正在与服务器通信 -> 一旦完成,它应该 return 控制器代码的结果 -> 控制器输出给用户
我遇到的问题是我不熟悉 Reactive PHP 并且不确定如何处理。
我试过了;
$service = new WebsocketService();
$startTimer = time();
$service->getList(44);
while($service->getResponse() == null) {
usleep(500);
if (time() > $startTimer + 10) {
continue; //Timeout on 10 seconds
}
}
var_dump($service->getResponse());
服务代码会在完成后将其 "response" 变量设置为 null 以外的值。这显然失败了,因为 sleep 方法阻塞了线程。同样没有,似乎 while 循环正在阻塞 I/O 并且反应代码失败。
一个解决方案是打开一个新线程和 运行 那里的 websocket 代码,但我不会对此感到满意。
我觉得我需要围绕 websocket 进程实施某种 "watcher",但我不确定该怎么做。
我们的 Websocket 服务客户端代码如下所示;
private $response = null;
/**
* @return null|object
*/
public function getResponse() {
return $this->response;
}
public function getList($accountId) {
$this->response = null;
\Ratchet\Client\connect('ws://192.168.56.1:8080')->then(function(\Ratchet\Client\WebSocket $conn) use ($accountId) {
$login = new \stdClass();
$login->action = 'login';
$conn->on('message', function($msg) use ($conn, $login, $accountId) {
try {
$response = json_decode($msg);
if ($response->result_id == 100) {
//Succesfully logged in to websocket server
//Do our request now.
$message = new \stdClass();
$message->target = 'test';
$conn->send(json_encode($message));
}
if (isset($response->reply) && $response->reply == 'list') {
$this->response = $response; //This is the content I need returned in the controller
$conn->close(); //Dont need it anymore
}
} catch (\Exception $e) {
echo 'response exception!';
//Do nothing for now
}
});
$conn->send(json_encode($login));
}, function ($e) {
echo "Could not connect: {$e->getMessage()}\n";
});
}
运行这样的代码也不行;
$service = new WebsocketService();
$service->getList(44);
echo 'Test';
var_dump($service->getResponse());
因为 "test" 回声在我收到来自 websocket 服务器的响应之前就出现了。
请赐教!我不确定要搜索什么。
您是否正在使用 WsServer 扩展 class,如果您遇到致命错误,这可能是个问题。我不确定您是否收到致命错误或警告。我还注意到 public 函数 onOpen() 打开了一个连接。请尝试参考此文档 http://socketo.me/api/class-Ratchet.WebSocket.WsServer.html 可能会有用。
PHP 和 websockets 似乎仍处于试验阶段。不过,我在 medium.com 上找到了一篇很棒的教程,由 Adam Winnipass 撰写,这对解决您的问题应该很有帮助:https://medium.com/@winni4eva/php-websockets-with-ratchet-5e76bacd7548
唯一的区别是他们使用 JavaScript 而不是 PHP 来实现他们的 websocket 客户端。但最后应该没有太大区别,因为一旦我们打开每一端的 Websocket 连接,两个应用程序都必须发送并等待接收通知 - 他们是这样说明的:
似乎创建成功的 Websocket 连接的一种可能性是扩展 MessageComponentInterface
use Ratchet\MessageComponentInterface;
这还需要
use Ratchet\ConnectionInterface;
消息组件接口定义了以下方法:
- 打开
- onMessage
- 关闭
- onError
而且我认为这就是 Ratchet 库实现它的方式。这就是他们最终启动服务器的方式:
use Ratchet\Server\IoServer;
use MyApp\MyCustomMessageComponentInterface;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
require dirname(__DIR__) . '/vendor/autoload.php';
$server = IoServer::factory(
new HttpServer(
new WsServer(
new MyCustomMessageComponentInterface()
)
),
8080
);
$server->run();
使用此架构,您已经可以接收 (onMessage),也可以使用 send() 方法发送。
我无法用您现有的代码解决确切的问题。但我想如果你按预期使用pre-built类和库的接口[=52=](并在此处演示)你应该能够实现你想要的将您的代码添加到相应的方法中。
可以在文档中找到更多信息和示例: