棘轮 PHP 和长 运行 任务

Ratchet PHP and long running tasks

我正在使用棘轮 php。我是这样开始的:

$loop    = \React\EventLoop\Factory::create();
$webSock = new \React\Socket\Server($loop);

$webSock->listen($this->port, $this->host);

$webServer = new \Ratchet\Server\IoServer(
    new \Ratchet\Http\HttpServer(
        new \Ratchet\WebSocket\WsServer(
            new PusherServer()
        )
    ),
    $webSock
);

return $loop;

现在,在我的 Pusherserver class(实现 MessageComponentInterface)的 onMessage() 中,我想执行一个长时间的阻塞任务。这将是一个 HTTP 请求,最多可能需要十秒才能完成。

如何在执行上一个 HTTP 请求时让 onMessage() 有空处理其他请求?我无法使用 pthreads,因为我无权更改我已经获得的 php 版本(线程安全的)。

这正是您在事件循环中做 任何事情时必须避免的问题:它不能阻塞,因为其他任何人都会尝试订阅或调用消息,或任何其他事件驱动的事件在完成之前不会发生。

这更像是一个架构问题,一旦您找到了最好的方法,接下来就是简化它并确保它适用于您需要的所有任务。

Ratchet 提供了一个 ZMQ 绑定——这很棒,因为一旦你设置了它,你在端口 5555 上收到的任何东西都会在 onYourMethodName() 中触发你的事件循环,或者任何你想调用它的东西!

考虑到这一点,您需要将需要完成的工作发送到作业队列、另一个进程(React 有 child-process extension which I don't particularly like because it's polling in user land as opposed to interrupt-driven I/O like PHP's PCNTL extension)或类似进程。

如果你想 "just get it working",启动需要完成的工作,连同连接 ID 或另一个 ID,这样你就知道响应需要返回给谁,在子进程中和完成后发送出去。这不会阻塞!

如果你想以更好的方式做到这一点,我强烈建议你研究它和它的架构,这样当你再次处理这样的异步问题时,你可以在你的职业生涯中利用这些知识,采用'fire-and-forget' 方法。在事件循环中将需要完成的工作放入作业队列,然后就不用管它了。

你的作业队列可以执行它的东西,当它完成时,通过 ZMQ(正在侦听的端口 5555,记住)返回结果,然后它可以将数据发送回客户端。

关于工作队列的精彩演讲,我强烈推荐来自 PHPNW 的 this one

最后请注意,因为您打开了这个东西并在端口 5555 上侦听数据,所以您可以从 任何地方 发送此数据。它可以是进程间通信,就像您有一个 java 应用程序将数据发送到端口 5555 或其他任何东西一样。它将事物绑定在一起,而不是耦合它们,这在您的架构中很重要。


关于实际使用 ZMQ 的示例,他们在 this page here 上提供了所有内容(如上链接),但我会尝试解释一下发生了什么。

$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onYourMethodName'));

这部分的意思是当anything向5555端口发送数据时,是一个"message"(你可以google其他可用的选项代替message ),它将在您的 $pusher 对象中调用 onYourMethodName。真的就是这么简单。任何超过 5555 的东西,都会命中 $pusher::onYourMethodName

因此,您现在只需要在事件处理程序中创建您的方法(在 onMessage()onSubscribe() 等旁边)...再次在该页面上提到了所有内容。

public function onYourMethodName($data) 
{
    /** You'll probably want to send the data in JSON format **/
    /** Imagine you get through a 'topic' in here... **/
    $data = json_decode($data, true);

    /** You should already have stored the people who are connected, topics etc - see the tutorial **/
    $topic = $this->subscribedTopics[$data['topic']];

    /** Send the data out to everyone subscribed to this topic **/
    $topic->broadcast($data);
}

如果您希望能够将数据发送给特定用户而不是所有人,有很多方法可以做到这一点。看看 this question 我是怎么做到的,但这是很久以前的事了。

您现在唯一需要自己做的是,在您的处理程序中(在 onMessage 或其他)中,实际将需要完成的事情连同应该发送回给谁(主题)一起放入队列中。

在你的 worker 完成它的工作并获取数据后,它需要调用它来点击我上面显示的代码:

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");

$socket->send(json_encode($data));

所以这是你需要做的:

  • 通过侦听端口 5555 获得第一位
  • 在您的事件处理程序等中创建您的方法
  • 暂时不用担心排队或其他任何事情
  • 创建一个非常简单的 php 脚本来执行上述 4 行代码,并证明当您单独 运行 该脚本时,它确实将数据发送到您的事件循环中
  • 然后担心如何将它添加到您的队列并让您的工作人员处理它