棘轮 PHP 和长 运行 任务
Ratchet PHP and long running tasks
我正在使用棘轮 php。我是这样开始的:
$loop = \React\EventLoop\Factory::create();
$webSock = new \React\Socket\Server($loop);
$webSock->listen($this->port, $this->host);
$webServer = new \Ratchet\Server\IoServer(
new \Ratchet\Http\HttpServer(
new \Ratchet\WebSocket\WsServer(
new PusherServer()
)
),
$webSock
);
return $loop;
现在,在我的 Pusherserver
class(实现 MessageComponentInterface
)的 onMessage()
中,我想执行一个长时间的阻塞任务。这将是一个 HTTP 请求,最多可能需要十秒才能完成。
如何在执行上一个 HTTP 请求时让 onMessage()
有空处理其他请求?我无法使用 pthreads,因为我无权更改我已经获得的 php 版本(线程安全的)。
这正是您在事件循环中做 任何事情时必须避免的问题:它不能阻塞,因为其他任何人都会尝试订阅或调用消息,或任何其他事件驱动的事件在完成之前不会发生。
这更像是一个架构问题,一旦您找到了最好的方法,接下来就是简化它并确保它适用于您需要的所有任务。
Ratchet 提供了一个 ZMQ 绑定——这很棒,因为一旦你设置了它,你在端口 5555 上收到的任何东西都会在 onYourMethodName()
中触发你的事件循环,或者任何你想调用它的东西!
考虑到这一点,您需要将需要完成的工作发送到作业队列、另一个进程(React 有 child-process extension which I don't particularly like because it's polling in user land as opposed to interrupt-driven I/O like PHP's PCNTL extension)或类似进程。
如果你想 "just get it working",启动需要完成的工作,连同连接 ID 或另一个 ID,这样你就知道响应需要返回给谁,在子进程中和完成后发送出去。这不会阻塞!
如果你想以更好的方式做到这一点,我强烈建议你研究它和它的架构,这样当你再次处理这样的异步问题时,你可以在你的职业生涯中利用这些知识,采用'fire-and-forget' 方法。在事件循环中将需要完成的工作放入作业队列,然后就不用管它了。
你的作业队列可以执行它的东西,当它完成时,通过 ZMQ(正在侦听的端口 5555,记住)返回结果,然后它可以将数据发送回客户端。
关于工作队列的精彩演讲,我强烈推荐来自 PHPNW 的 this one。
最后请注意,因为您打开了这个东西并在端口 5555 上侦听数据,所以您可以从 任何地方 发送此数据。它可以是进程间通信,就像您有一个 java 应用程序将数据发送到端口 5555 或其他任何东西一样。它将事物绑定在一起,而不是耦合它们,这在您的架构中很重要。
关于实际使用 ZMQ 的示例,他们在 this page here 上提供了所有内容(如上链接),但我会尝试解释一下发生了什么。
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onYourMethodName'));
这部分的意思是当anything向5555端口发送数据时,是一个"message"(你可以google其他可用的选项代替message ),它将在您的 $pusher
对象中调用 onYourMethodName
。真的就是这么简单。任何超过 5555 的东西,都会命中 $pusher::onYourMethodName
。
因此,您现在只需要在事件处理程序中创建您的方法(在 onMessage()
、onSubscribe()
等旁边)...再次在该页面上提到了所有内容。
public function onYourMethodName($data)
{
/** You'll probably want to send the data in JSON format **/
/** Imagine you get through a 'topic' in here... **/
$data = json_decode($data, true);
/** You should already have stored the people who are connected, topics etc - see the tutorial **/
$topic = $this->subscribedTopics[$data['topic']];
/** Send the data out to everyone subscribed to this topic **/
$topic->broadcast($data);
}
如果您希望能够将数据发送给特定用户而不是所有人,有很多方法可以做到这一点。看看 this question 我是怎么做到的,但这是很久以前的事了。
您现在唯一需要自己做的是,在您的处理程序中(在 onMessage 或其他)中,实际将需要完成的事情连同应该发送回给谁(主题)一起放入队列中。
在你的 worker 完成它的工作并获取数据后,它需要调用它来点击我上面显示的代码:
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$socket->send(json_encode($data));
所以这是你需要做的:
- 通过侦听端口 5555 获得第一位
- 在您的事件处理程序等中创建您的方法
- 暂时不用担心排队或其他任何事情
- 创建一个非常简单的 php 脚本来执行上述 4 行代码,并证明当您单独 运行 该脚本时,它确实将数据发送到您的事件循环中
- 然后担心如何将它添加到您的队列并让您的工作人员处理它
我正在使用棘轮 php。我是这样开始的:
$loop = \React\EventLoop\Factory::create();
$webSock = new \React\Socket\Server($loop);
$webSock->listen($this->port, $this->host);
$webServer = new \Ratchet\Server\IoServer(
new \Ratchet\Http\HttpServer(
new \Ratchet\WebSocket\WsServer(
new PusherServer()
)
),
$webSock
);
return $loop;
现在,在我的 Pusherserver
class(实现 MessageComponentInterface
)的 onMessage()
中,我想执行一个长时间的阻塞任务。这将是一个 HTTP 请求,最多可能需要十秒才能完成。
如何在执行上一个 HTTP 请求时让 onMessage()
有空处理其他请求?我无法使用 pthreads,因为我无权更改我已经获得的 php 版本(线程安全的)。
这正是您在事件循环中做 任何事情时必须避免的问题:它不能阻塞,因为其他任何人都会尝试订阅或调用消息,或任何其他事件驱动的事件在完成之前不会发生。
这更像是一个架构问题,一旦您找到了最好的方法,接下来就是简化它并确保它适用于您需要的所有任务。
Ratchet 提供了一个 ZMQ 绑定——这很棒,因为一旦你设置了它,你在端口 5555 上收到的任何东西都会在 onYourMethodName()
中触发你的事件循环,或者任何你想调用它的东西!
考虑到这一点,您需要将需要完成的工作发送到作业队列、另一个进程(React 有 child-process extension which I don't particularly like because it's polling in user land as opposed to interrupt-driven I/O like PHP's PCNTL extension)或类似进程。
如果你想 "just get it working",启动需要完成的工作,连同连接 ID 或另一个 ID,这样你就知道响应需要返回给谁,在子进程中和完成后发送出去。这不会阻塞!
如果你想以更好的方式做到这一点,我强烈建议你研究它和它的架构,这样当你再次处理这样的异步问题时,你可以在你的职业生涯中利用这些知识,采用'fire-and-forget' 方法。在事件循环中将需要完成的工作放入作业队列,然后就不用管它了。
你的作业队列可以执行它的东西,当它完成时,通过 ZMQ(正在侦听的端口 5555,记住)返回结果,然后它可以将数据发送回客户端。
关于工作队列的精彩演讲,我强烈推荐来自 PHPNW 的 this one。
最后请注意,因为您打开了这个东西并在端口 5555 上侦听数据,所以您可以从 任何地方 发送此数据。它可以是进程间通信,就像您有一个 java 应用程序将数据发送到端口 5555 或其他任何东西一样。它将事物绑定在一起,而不是耦合它们,这在您的架构中很重要。
关于实际使用 ZMQ 的示例,他们在 this page here 上提供了所有内容(如上链接),但我会尝试解释一下发生了什么。
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onYourMethodName'));
这部分的意思是当anything向5555端口发送数据时,是一个"message"(你可以google其他可用的选项代替message ),它将在您的 $pusher
对象中调用 onYourMethodName
。真的就是这么简单。任何超过 5555 的东西,都会命中 $pusher::onYourMethodName
。
因此,您现在只需要在事件处理程序中创建您的方法(在 onMessage()
、onSubscribe()
等旁边)...再次在该页面上提到了所有内容。
public function onYourMethodName($data)
{
/** You'll probably want to send the data in JSON format **/
/** Imagine you get through a 'topic' in here... **/
$data = json_decode($data, true);
/** You should already have stored the people who are connected, topics etc - see the tutorial **/
$topic = $this->subscribedTopics[$data['topic']];
/** Send the data out to everyone subscribed to this topic **/
$topic->broadcast($data);
}
如果您希望能够将数据发送给特定用户而不是所有人,有很多方法可以做到这一点。看看 this question 我是怎么做到的,但这是很久以前的事了。
您现在唯一需要自己做的是,在您的处理程序中(在 onMessage 或其他)中,实际将需要完成的事情连同应该发送回给谁(主题)一起放入队列中。
在你的 worker 完成它的工作并获取数据后,它需要调用它来点击我上面显示的代码:
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$socket->send(json_encode($data));
所以这是你需要做的:
- 通过侦听端口 5555 获得第一位
- 在您的事件处理程序等中创建您的方法
- 暂时不用担心排队或其他任何事情
- 创建一个非常简单的 php 脚本来执行上述 4 行代码,并证明当您单独 运行 该脚本时,它确实将数据发送到您的事件循环中
- 然后担心如何将它添加到您的队列并让您的工作人员处理它