React ZMQ,路由器到经销商,不工作。 ZMQ 4.0.5

React ZMQ, Router to Dealer, not working. ZMQ 4.0.5

我想用 React 做一个异步路由器到经销商消息传递,但它不工作。 http://zguide.zeromq.org/php:rtdealer 中的代码有效,但我无法确定我在做什么不同。我正在使用 libzmq 4.0.5

这是我的代码:

$context = new React\ZMQ\Context($loop);

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);
$worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
$worker->connect('tcp://127.0.0.1:5556');
$worker->send('END');

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$i = 0;
$loop->addPeriodicTimer(1, function (React\EventLoop\Timer\Timer $timer) use (&$i, $router) {
    echo 'Time to send!'. PHP_EOL;
    $i++;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

问题是只有经销商发送第一条消息"END"。之后,路由器尝试发送消息,但经销商没有收到消息。

另外,这个函数好像只调用了一次:

// \React\ZMQ\SocketWrapper
public function handleReadEvent()
{
    $messages = $this->socket->recvmulti(ZMQ::MODE_NOBLOCK);
    echo 'Receiving...';    // Added
    var_dump($messages);    // Added

    if ($messages !== false) {
        if (count($messages) === 1) {
            $this->emit('message', array($messages[0]));
        }

        $this->emit('messages', array($messages));
    }
}

输出为:

Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Time to send!
Time to send!
Time to send!
Time to send!
Time to send!
...

编辑:

在dealer连接之前更改了绑定路由器的代码,问题依旧:

$loop = React\EventLoop\Factory::create();

$context = new React\ZMQ\Context($loop);

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$loop->addPeriodicTimer(10, function (React\EventLoop\Timer\Timer $timer) use ($router) {
    echo 'Router sending messages with an interval of 10 seconds'. PHP_EOL;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);

$loop->addPeriodicTimer(5, function (React\EventLoop\Timer\Timer $timer) use ($worker) {
    echo 'After 5 seconds from router binding, connect the dealer and send something'. PHP_EOL;
    $worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
    $worker->connect('tcp://127.0.0.1:5556');
    $worker->send('END');
    $timer->getLoop()->cancelTimer($timer);     // Cancel the timer after connecting
});

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

这是终端输出:

After 5 seconds from router binding, connect the dealer and send something
Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds

我终于解决了这个问题。在@Jason 的评论 Not having used PHP/React 的帮助下,你确定你的命名空间是正确的吗? ,我查看了调用的 send 方法。此方法如下所示:

// \React\ZMQ\SocketWrapper

public function send($message)
{
    echo 'Inside send, sending message'. PHP_EOL; //This line was added by myself
    $this->buffer->send($message);
}

然后,我想:好吧,如果我这样调用这个方法:

$router->send('A', \ZMQ::MODE_SNDMORE);

永远不会传递第二个参数。

然后,我看到sendmulti方法没有在SocketWrapper中实现,__call方法在SocketWrapperclass中实现了直接调用 ZMQ api 中实现的方法 PHP,所以我尝试调用 sendmulti 将这两个参数包装在一个数组中。它起作用了,所以我尝试调用第一个 send 方法传递同一个数组,我不知道为什么但它也起作用了,所以诀窍是调用:

$router->send(array('A', \ZMQ::MODE_SNDMORE));
$router->send(array('END'));

而不是:

$router->send('A', \ZMQ::MODE_SNDMORE);
$router->send('END');

此外,请注意位于 GitHub 存储库中的 ZMQ PHP API,这是错误的。 PhpStorm 以某种方式下载了另一个 ZMQ PHP API 位于某处似乎是正确的,sendmulti 方法原型在 API:

中看起来像这样
public function sendmulti(array $message, $mode = 0)

令人惊讶的是,以下对该方法的调用似乎给出了相同的结果:

$router->send(array('A', \ZMQ::MODE_SNDMORE));
$router->send(array('A'), \ZMQ::MODE_SNDMORE);