ZeroMQ (clrzmq4) 轮询问题

ZeroMQ (clrzmq4) polling issue

我想要完成的是实现从两个套接字之一读取消息,无论消息先到达哪里。据我所知,轮询 (zmq_poll) 是正确的做法(如 mspoller in guide 所示)。在这里我将提供小的伪代码片段:

TimeSpan timeout = TimeSpan.FromMilliseconds(50);

using (var receiver1 = new ZSocket(ZContext.Current, ZSocketType.DEALER))
using (var receiver2 = new ZSocket(ZContext.Current, ZSocketType.PAIR))
{
    receiver1.Bind("tcp://someaddress");
    // Note that PAIR socket is inproc:
    receiver2.Connect("inproc://otheraddress");

    var poll = ZPollItem.CreateReceiver();

    ZError error;
    ZMessage msg;

    while (true)
    {
        if (receiver1.PollIn(poll, out msg, out error, timeout))
        {
            // ...
        }

        if (receiver2.PollIn(poll, out msg, out error, timeout))
        {
            // ...
        }
    }
}

如您所见,它实际上与 mspoller in guide.

中的实现完全相同

在我的例子中 receiver2(配对套接字)应该会收到大量消息。事实上,我已经创建了一个测试,其中发送给它的消息数量总是大于它能够接收的消息数量(至少在演示的实现中)。

我已经 运行 测试了 2 秒,我对结果感到非常惊讶:

然后我尝试使用不同的 timeout 值,我发现它会显着影响接收到的消息数量。持续时间(2 秒)和发送的消息数 (180) 保持不变。结果是:

结果告诉我轮询根本行不通。如果轮询正常工作,据我了解该机制,timeout 不应在这种情况下产生任何影响。无论我们将超时设置为 1 小时还是 5 毫秒 - 因为总是有消息要接收,所以没有什么可等待的,所以循环应该以相同的 speed.

我的另一个大问题是,即使 timeout 值非常小,receiver2 也无法接收所有 180 条消息。我在这里努力实现每秒 100 条消息的接收率,尽管我选择了应该非常快的 ZeroMQ(基准提到的数字是每秒 600 万条消息)。

所以我的问题很明显:我在这里做错了什么吗?有没有更好的方法来实现轮询?

通过浏览 clrzmq4 代码,我注意到也可以在套接字 ZPollItems.cs, line 151 的枚举上调用 pollIn 方法,但我没有在任何地方找到任何示例!

这是正确的方法吗?有任何文档吗?

谢谢

我找到了这个问题/解决方案。我们应该在套接字数组上使用 PollIn 方法,而不是分别在每个套接字上使用 PollIn 方法。显然 the example from the guide 极大的误导 。正确的做法是:

TimeSpan timeout = TimeSpan.FromMilliseconds(50);

using (var receiver1 = new ZSocket(ZContext.Current, ZSocketType.DEALER))
using (var receiver2 = new ZSocket(ZContext.Current, ZSocketType.PAIR))
{
    receiver1.Bind("tcp://someaddress");
    receiver2.Connect("inproc://otheraddress");

    // We should "remember" the order of sockets within the array
    // because order of messages in the received array will correspond to it.
    ZSocket[] sockets = { receiver1, receiver2 };

    // Note that we should use two ZPollItem instances:
    ZPollItem[] pollItems = { ZPollItem.CreateReceiver(), ZPollItem.CreateReceiver() };

    ZError error;
    ZMessage[] msg;

    while (true)
    {
        if (sockets.PollIn(pollItems, out msg, out error, timeout))
        {
            if (msg[0] != null)
            {
                // The first message gotten from receiver1
            }

            if (msg[1] != null)
            {
                // The second message gotten from receiver2
            }
        }
    }
}

现在 receiver2 达到每秒 15,000 条接收消息,无论 timeout 值如何,也无论 receiver1.

接收的消息数量如何

更新:来自 clrzmq4 的人已经确认 this issue,所以这个例子可能会很快得到纠正。