Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World

Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World

我刚开始使用 ZeroMQ,我正在尝试让 Hello World 与 Python 3.6 中的 PyZMQ 和 asyncio 一起工作。我正在尝试将模块的功能与 pub/sub 代码分离,因此有以下 class 设置:

编辑 1:最小化示例

编辑 2:包含的解决方案,请参阅下面的答案。

import asyncio
import zmq.asyncio
from zmq.asyncio import Context

# manages message flow between publishers and subscribers
class HelloWorldMessage:
    def __init__(self, url='127.0.0.1', port='5555'):
        self.url = "tcp://{}:{}".format(url, port)
        self.ctx = Context.instance()

        # activate publishers / subscribers
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.pub_hello_world(),
            self.sub_hello_world(),
        ]))

    # generates message "Hello World" and publish to topic 'world'
    async def pub_hello_world(self):
        pub = self.ctx.socket(zmq.PUB)
        pub.connect(self.url)

        # message contents
        msg = "Hello World"
        print(msg)

        # keep sending messages
        while True:
            # --MOVED-- slow down message publication
            await asyncio.sleep(1) 

            # publish message to topic 'world'
            # async always needs `send_multipart()`
            await pub.send_multipart([b'world', msg.encode('ascii')])  # WRONG: bytes(msg)

    # processes message "Hello World" from topic 'world'
    async def sub_hello_world(self):
        sub = self.ctx.socket(zmq.SUB)
        sub.bind(self.url)
        sub.setsockopt(zmq.SUBSCRIBE, b'world')

        # keep listening to all published message on topic 'world'
        while True:
            msg = await sub.recv_multipart()
            # ERROR: WAITS FOREVER
            print('received: ', msg)

if __name__ == '__main__':
    HelloWorldMessage()

问题

使用上面的代码只打印 1 Hello World 然后永远等待。如果我按 ctrl+c,我会收到以下错误:

python helloworld_pubsub.py

Hello World
^CTraceback (most recent call last):
  File "helloworld_pubsub_Whosebug.py", line 64, in <module>
    HelloWorldMessage()
  File "helloworld_pubsub_Whosebug.py", line 27, in __init__
    self.sub_hello_world(),
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
    event_list = self._selector.select(timeout)
  File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

版本:libzmq: 4.2.3pyzmq: 17.0.0Ubuntu 16.04

如有任何见解,我们将不胜感激。

De-coupling 对于 OOP 关注点分离很好,但是
我们还要注意调试代码:

1) ZeroMQ PUB/SUB 可扩展的正式通信原型已为 require some time before PUB/SUB-s get indeed ready 所知多年,以便广播/接受消息。因此,人们应该更愿意在 .__init__() 内最好地设置基础设施,而不是在 SUB-s 应该已经收到一些有效载荷

在我看来,这将是一种更安全的设计方法:

class HelloWorldMessage:
    """                                                       __doc__
    [DEF-ME]
    [DOC-ME]

    USAGE:     with HelloWorldMessage() as aContextManagerFUSEd_class_INSTANCE:
                    # may              use aContextManagerFUSEd_class_INSTANCE
                    # and shall safely
                    #     gracefully terminate locally spawned ZeroMQ resources
    PARAMETERS:
    RETURNS:   
    THROWS:    
    EXAMPLE:   
    REF.s:

    [TEST-ME]
    [PERF-ME]
    [PUB-ME]
    """
    def __init__( self, url  = '127.0.0.1',
                        port = '5555'
                        ):

        self._url = "tcp://{}:{}".format( url, port )
        #---------------------------------------------------- CONTEXT:
        self._ctx = Context.instance();                       print( "INF: zmq.asyncio.Context() set" if ( zmq.ZMQError() == 0 ) else "ERR[1]: {0:}".format( zmq.ZMQError() ) )
        #---------------------------------------------------- SUB:
        self._sub = self._ctx.socket(zmq.SUB );               print( "INF: zmq.SUB set"               if ( zmq.ZMQError() == 0 ) else "ERR[2]: {0:}".format( zmq.ZMQError() ) )
        self._sub.bind(                  self._url );         print( "INF: zmq.SUB.bind() done"       if ( zmq.ZMQError() == 0 ) else "ERR[3]: {0:}".format( zmq.ZMQError() ) )
        self._sub.setsockopt(        zmq.LINGER, 1 );         print( "INF: zmq.SUB LINGER set"        if ( zmq.ZMQError() == 0 ) else "ERR[4]: {0:}".format( zmq.ZMQError() ) )
        self._sub.setsockopt(        zmq.SUBSCRIBE, b'world');print( "INF: zmq.SUB subscribed"        if ( zmq.ZMQError() == 0 ) else "ERR[5]: {0:}".format( zmq.ZMQError() ) )
        #---------------------------------------------------- PUB:
        self._pub = self._ctx.socket(zmq.PUB );               print( "INF: zmq.PUB set"               if ( zmq.ZMQError() == 0 ) else "ERR[6]: {0:}".format( zmq.ZMQError() ) )
        self._pub.setsockopt(        zmq.LINGER, 1 );         print( "INF: zmq.PUB LINGER set"        if ( zmq.ZMQError() == 0 ) else "ERR[7]: {0:}".format( zmq.ZMQError() ) )
        self._pub.connect(               self._url );         print( "INF: zmq.PUB.connect() done"    if ( zmq.ZMQError() == 0 ) else "ERR[8]: {0:}".format( zmq.ZMQError() ) )
        #----------------------------------------------------
        ...
    def __enter__( self ):
        #---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __enter__()-auto-METHOD
        return self

    def __exit__( self, exc_type, exc_value, traceback ):
        #---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __exit__()-auto-METHOD
        self.try_to_close( self._pub );
        self.try_to_close( self._sub );
        pass;         self._ctx.term()
        return

    ################################################################
    #
    # A       PUB-SENDER ------------------------------------
    async def pub_hello_world( self ):

          self._pObj = PubHelloWorld();                       print( "INF: pObj set on PUB-side"      if ( self._pObj.msg_pub()  # instance-fuse(d)
                                                                                                         ==   "Hello World"    ) else "ERR[9]: {0:}".format( "Hello World" ) )
          try:
               while True:                                    # keep sending messages
                   self._sMsg = self._pObj.msg_pub();         print( "INF: pObj.msg_pub() called"     if ( self._sMsg  != None ) else "ERR[A]: {0:}".format( "msg == ?"    ) )
                   pass;                                      print( self._sMsg )
                   # publish message to topic 'world'
                   # async always needs `send_multipart()`
                   await self._pub.send_multipart( [ b'world',
                                                       bytes( self._sMsg )
                                                       ]
                                                  );          print( "INF: await .send_multipart()"   if ( zmq.ZMQError() == 0 ) else "ERR[B]: {0:}".format( zmq.ZMQError() ) )
                   # slow down message publication
                   await asyncio.sleep( 1 );                  print( "NOP: await .sleep( 1 )"         if ( zmq.ZMQError() == 0 ) else "ERR[C]: {0:}".format( zmq.ZMQError() ) )
          except:
              pass;                                           print( "EXC: thrown on PUB side"        if ( zmq.ZMQError() == 0 ) else "ERR[D]: {0:}".format( zmq.ZMQError() ) )

          finally:
              self._pub.close();                              print( "FIN: PUB.close()-d"             if ( zmq.ZMQError() == 0 ) else "ERR[E]: {0:}".format( zmq.ZMQError() ) )

    ################################################################
    #
    # A       SUB-RECEIVER ---------------------------------
    async def sub_hello_world( self ):

          self._sObj = SubHelloWorld();                       print( "INF: sObj set on SUB-side"      if (  None                 # instance-fuse(d)
                                                                                                         == self._sObj.msg_receive("?")
                                                                                                            )                    else "ERR[F]: {0:}".format( "?"            ) )
          try:
               while True:                                   # keep listening to all published message on topic 'world'
                     pass;                                    print( "INF: await .recv_multipart() about to be called now:" )
                     self._rMsg = await self._sub.recv_multipart()
                     pass;                                    print( "INF: await .recv_multipart()"   if ( zmq.ZMQError() == 0 ) else "ERR[G]: {0:}".format( zmq.ZMQError() ) )
                     pass;                                    print( 'ACK: received: ', self._rMsg )
                     self._sObj.msg_receive( self._rMsg );    print( 'ACK: .msg_receive()-printed.' )
          except:
              pass;                                           print( "EXC: thrown on SUB side"        if ( zmq.ZMQError() == 0 ) else "ERR[H]: {0:}".format( zmq.ZMQError() ) )

          finally:
              self._sub.close();                              print( "FIN: SUB.close()-d"             if ( zmq.ZMQError() == 0 ) else "ERR[I]: {0:}".format( zmq.ZMQError() ) )

    # ---------close()---------------------------------------
    def try_to_close( self, aSocketINSTANCE ):

        try:
            aSocketINSTANCE.close();

        except:
            pass;

        return

2) 最好使用 with HelloworldMessage() as ... : context-manager

我的代码有 2 个错误:

  1. 如@user3666197 所述,PUB/SUB 通信原型需要一些时间 初始化(参见 his/her 回答)。我不得不将 await asyncio.sleep(1) 移动到发布代码上方 (await pub.send_multipart([b'world', msg.encode('ascii')]))
  2. 我把消息编码错了。 bytes(msg) --> msg.encode('ascii')

这个答案与我的问题最相关,但请查看@user3666197 以了解实现 PyZMQ 时的某些设计选择。

建议

asyncio.get_event_loop() 中的 PyZMQ 似乎没有给出错误回溯,因此,将您的代码包装在 try & except 块,例如:

import traceback
import logging

try:
    while True:
        msg_received = await sub.recv_multipart()
        # do other stuff

except Exception as e:
    print("Error with sub world")
    logging.error(traceback.format_exc())