PUSH/PULL 模式中的丢失消息(Ratchet + PHP + ZeroMQ 推送集成)

Lost messages in PUSH/PULL pattern ( Ratchet + PHP + ZeroMQ push integration )

我正在我的网站上创建聊天、推送通知系统、用户 activity 小部件(即时更新)等

我的网站是建立在 PHP 上的,所以我决定使用 Ratchet 作为我的任务的 websocket 服务器。我已经安装了所有必需的组件,并且学习了 http://socketo.me/docs/push 上的指南并开始编写代码。

这是在 model.php 文件中的 ChatMsg( $item ){...} 方法中。它创建了一个 PUSH 套接字访问点原型,并在数据库中插入新项目后通过 ZeroMQ 向服务器发送带有 JSON 数据的消息:

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$socket->send(json_encode($sData));

接下来是我的 push-server.php,只创建一个 PULL 套接字接入点原型并等待新消息,将被传输到推送脚本,向客户端广播新通知、聊天消息和其他事件。

<?php 
require dirname(__DIR__) . '/vendor/autoload.php';

    $loop   = React\EventLoop\Factory::create();
    $pusher = new MyApp\Pusher;

    // Listen for the web server to make a ZeroMQ push after an ajax request
    $context = new React\ZMQ\Context($loop);
    $pull = $context->getSocket(ZMQ::SOCKET_PULL);
    $pull->setSockOpt(ZMQ::SOCKOPT_HWM, 0);
    $pull->bind('tcp://127.0.0.1:5555'); // Binding to 127.0.0.1 means the only client that can connect is itself
    $pull->on('error', function ($e) {
        $f = fopen('push-server-error.log', "a");
        fwrite($f, $e->getMessage()."\n");
        fclose($f);
    });
    $pull->on('message', array($pusher, 'onNewEvent'));

    // Set up our WebSocket server for clients wanting real-time updates
    $webSock = new React\Socket\Server($loop);
    $webSock->listen(8081, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
    $webServer = new Ratchet\Server\IoServer(
        new Ratchet\Http\HttpServer(
            new Ratchet\WebSocket\WsServer(
                new Ratchet\Wamp\WampServer(
                    $pusher
                )
            )
        ),
        $webSock
    );

    $loop->run();
?>

我使用监控工具 Supervisor 成功启动了 push-server.php,我为 WebSocket 流量设置了 NGINX 代理,设置了客户端脚本(高速公路等) ).

一般来说,我打算在生产中全部使用它。第一个小时我在我的网站上修改了新的聊天系统,我测试了它并且一切正常。

但后来我遇到了这个问题。一些 ZeroMQ 消息(只有一部分,可能是 5-10%)在通过 ZeroMQ PUSH 套接字发送后丢失。那时,当 push-server.php 进程启动后发送了大约 300-400 条消息时,就会出现此问题。

我深信这个问题出在 ZeroMQ 内部(而不是在 JS 客户端或具有业务逻辑的 Pusher 脚本内部),因为我试图修改“->on(){...}push-server.php 中的方法以便在终端(控制台)上显示新消息,丢失的消息甚至不会显示在控制台上,即“->on(){...} " 方法赶不上他们。

ZeroMQ“->send()”方法总是returns一个空的ZeroMQ套接字对象,当消息成功发送或丢失时。我只是通过在我的网站上发送聊天消息并获得回复(使用 AJAX 实现的表单提交)来检查这一点:

var_dump($socket->send(json_encode($sData)));

这可能是什么问题,如何解决?

Server OS:      CentOS 6.9 (Final)
PHP version:           5.6.31
ZMQ extension version: 1.1.3
libzmq version:        4.2.2

我创建了 not persistent ZMQ::ContextZMQ::Socket 我的问题已经解决了:

$context = new ZMQContext(1, false);
$socket = $context->getSocket(ZMQ::SOCKET_PUSH);