ZMQ:XPUB 套接字上没有多个订阅者的订阅消息(最后值缓存模式)

ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)

我实现了 ZMQ 的最后值缓存 (LVC) 示例 (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching),但无法让第二个订阅者在后端注册。

订阅者第一次加入时,满足event[0] == b'\x01'条件并发送缓存值,但第二个订阅者(相同主题)甚至没有注册(if backend in events:永远不会真实)。其他一切正常。数据从发布者传递到订阅者(全部)。

这可能是什么原因?后端连接方式是否正确?这种模式只适用于第一个订阅者吗?

更新

当我为第二个订阅者订阅另一个主题时,我得到了正确的行为(即订阅时 \x01)。这似乎真的适用于第一个订阅者 onlt 。 ZeroMQ 中有错误吗?

更新 2

这是一个最小的工作示例,它表明 LVC 模式没有工作(至少不是这里实现的方式)。

# subscriber.py
import zmq

def main():
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.connect("tcp://127.0.0.1:5558")

    # Subscribe to every single topic from publisher
    print 'subscribing (sub side)'
    sub.setsockopt(zmq.SUBSCRIBE, b"my-topic")

    poller = zmq.Poller()
    poller.register(sub, zmq.POLLIN)
    while True:
        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if sub in events:
            msg = sub.recv_multipart()
            topic, current = msg
            print 'received %s on topic %s' % (current, topic)

if __name__ == '__main__':
    main() 

这里是代理(如示例中所示,但更冗长一些,还有一个集成的发布者)。

# broker.py
# from http://zguide.zeromq.org/py:lvcache
import zmq
import threading
import time


class Publisher(threading.Thread):
    def __init__(self):
        super(Publisher, self).__init__()

    def run(self):
        time.sleep(10)
        ctx = zmq.Context.instance()
        pub = ctx.socket(zmq.PUB)
        pub.connect("tcp://127.0.0.1:5557")

        cnt = 0
        while True:
            msg = 'hello %d' % cnt
            print 'publisher is publishing %s' % msg
            pub.send_multipart(['my-topic', msg])
            cnt += 1
            time.sleep(5)


def main():
    ctx = zmq.Context.instance()
    frontend = ctx.socket(zmq.SUB)
    frontend.bind("tcp://*:5557")
    backend = ctx.socket(zmq.XPUB)
    backend.bind("tcp://*:5558")

    # Subscribe to every single topic from publisher
    frontend.setsockopt(zmq.SUBSCRIBE, b"")

    # Store last instance of each topic in a cache
    cache = {}

    # We route topic updates from frontend to backend, and
    # we handle subscriptions by sending whatever we cached,
    # if anything:
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)


    # launch a publisher
    p = Publisher()
    p.daemon = True
    p.start()

    while True:

        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if frontend in events:
            msg = frontend.recv_multipart()
            topic, current = msg
            cache[topic] = current
            backend.send_multipart(msg)

        ### this is where it fails for the 2nd subscriber. 
        ### There's never even an event from the backend 
        ### in events when the 2nd subscriber is subscribing.

        # When we get a new subscription we pull data from the cache:
        if backend in events:
            print 'message from subscriber'
            event = backend.recv()
            # Event is one byte 0=unsub or 1=sub, followed by topic
            if event[0] == b'\x01':
                topic = event[1:]
                print ' => subscribe to %s' % topic
                if topic in cache:
                    print ("Sending cached topic %s" % topic)
                    backend.send_multipart([ topic, cache[topic] ])
            elif event[0] == b'\x00':
                topic = event[1:]
                print ' => unsubscribe from %s' % topic

if __name__ == '__main__':
    main()

运行 此代码 (1 x broker.py, 2 x subscriber.py) 显示第一个订阅者按预期在代理处注册(\x01 和缓存查找) ,但是第二个订阅者没有以相同的方式注册。有趣的是,第二个订阅者连接到 pub/sub 频道,一段时间后(10 秒)两个订阅者都从发布者那里收到数据。

这非常难运行ge。也许我的一些图书馆已经过时了。这是我得到的:

Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> zmq.__version__
'14.1.1'

$ brew info zeromq
zeromq: stable 4.0.5 (bottled), HEAD
High-performance, asynchronous messaging library
http://www.zeromq.org/
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) *
  Poured from bottle
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb
==> Dependencies
Build: pkg-config ✔
Optional: libpgm ✘, libsodium ✘

更新 3

这种行为也可以在 zeromq 4.1.2pyzmq-14.7.0(安装或不安装 libpgm 和 libsodium)中观察到。

更新 4

另一项观察表明,第一个订阅者的处理方式有所不同:第一个订阅者是唯一一个以预期方式从 XPUB 套接字 (backend) 取消订阅的订阅者,方法是在其订阅主题之前加上\x00。其他订阅者(我尝试了超过 2 个)在后端频道上保持静音(尽管收到消息)。

更新 5

我希望我不会陷入困境,但我已经研究了 czmq 绑定和 运行 我的 Python C 示例。结果是相同,所以我想这不是绑定问题,而是 libzmq.

我还验证了第二个订阅者正在发送订阅消息,我确实可以在线上看到:

首先订阅:

0000  02 00 00 00 45 00 00 3f  98 be 40 00 40 06 00 00   ....E..? ..@.@...
0010  7f 00 00 01 7f 00 00 01  fa e5 15 b6 34 f0 51 c3   ........ ....4.Q.
0020  05 e4 8b 77 80 18 31 d4  fe 33 00 00 01 01 08 0a   ...w..1. .3......
0030  2a aa d1 d2 2a aa cd e9  00 09 01 6d 79 2d 74 6f   *...*... ...my-to
0040  70 69 63                                           pic              

第 2 条订阅消息(与上面的不同)已标记和解释。在订阅帧中发送相同的数据。

                               identification
                               v
0000  02 00 00 00 45 00 00 3f  ed be 40 00 40 06 00 00   ....E..? ..@.@...
                             src port      sequence number
                                  v        v  v  v  v
0010  7f 00 00 01 7f 00 00 01  fa e6 15 b6 17 da 02 e7   ........ ........

Acknowledgement number   window scaling factor
      v  v  v  v           v
0020  71 4b 33 e6 80 18 31 d5  fe 33 00 00 01 01 08 0a   qK3...1. .3......

timestamp value  timestamp echo reply
            v           v  v   |<-------- data -------
0030  2a aa f8 2c 2a aa f4 45  00 09 01 6d 79 2d 74 6f   *..,*..E ...my-to

      ------>|
0040  70 69 63                                           pic              

我找到了这个问题的解决方案,尽管我从前到后又从前阅读了文档,但我没有看到它。关键是XPUB_VERBOSE。将此行添加到后端初始化之后,一切正常

backend.setsockopt(zmq.XPUB_VERBOSE, True)

这是摘录 from the official documentation:

ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets Sets the XPUB socket behavior on new subscriptions and unsubscriptions. A value of 0 is the default and passes only new subscription messages to upstream. A value of 1 passes all subscription messages upstream.

Option value type int Option value unit 0, 1 Default value 0 Applicable socket types ZMQ_XPUB

Pieter Hintjens 对此有更多信息 in his blog。这是相关部分:

A few months ago we added a neat little option (ZMQ_XPUB_VERBOSE) to XPUB sockets that disables its filtering of duplicate subscriptions. This now works for any number of subscribers. We use this as follows:

void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_set_xpub_verbose (publisher, 1);
zsocket_bind (publisher, "tcp://*:6001");

应更新 LVC 模式说明以反映此设置,否则此模式将不起作用。