PyZMQ 请求套接字 - 挂在 context.term()

PyZMQ req socket - hang on context.term()

努力在服务器不可用时正确关闭基于 pyzmq 的简单客户端。以下是 2 个片段。

首先是服务器。这或多或少是 pyzmq 示例。这里没有特殊代码:

import zmq
import json

port = 5555

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:{0}".format(port))

while True:
    message = socket.recv_json()
    print(json.dumps(message))
    socket.send_json({'response': 'Hello'})

接下来是客户端。

import zmq

ip = 'localhost'
port = 5555
addr ="tcp://{0}:{1}".format(ip, port)
message = {'value': 10}

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(addr)

if socket.poll(timeout=1000, flags=zmq.POLLOUT) != 0:
    socket.send_json(message, flags=zmq.NOBLOCK)        
    if socket.poll(timeout=1000, flags=zmq.POLLIN) != 0:
        response = socket.recv_json()
        print(response)

socket.disconnect(addr)
socket.close(linger=0)
context.term()

在这里,我尝试通过服务器不可用时的超时功能来增强默认客户端。下面的代码使用了轮询方法,尽管我也尝试过在套接字上设置接收超时。

如果服务器是运行,客户端发送和接收响应并干净退出。

如果服务器不是 运行,客户端会立即通过第一个 socket.poll 调用(因为 zmq 只是在内部缓冲消息)。它在第二次 socket.poll 调用时阻塞 1 秒,并正确地跳过 recv_json 块。然后它挂在 context.term() 调用上。我的理解是,通过搜索,如果有未关闭的套接字,这将挂起,但似乎并非如此。

非常感谢任何帮助。

开启“服务器不可用时超时的能力

超时是可能的,但这将不允许硬连线 REQ/REP 两步舞继续存在,如果一侧超时 d分布式 Finite State 中的强制步骤A自动机方案(dFSA不能走单边捷径,它是双边dFSA)。


假设:

If the server is not running, the client passes immediately through the first socket.poll call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll call and correctly skips the recv_json block. It then hangs on the context.term() call.

验证:

让我们逐步回顾代码

def  Test( SetImmediate = False ):
     ##################################################################################
     import zmq, json, time;                                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
     ##################################################################################
     ip      = 'localhost';                                                       print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
     port    =  5555;                                                             print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
     addr    = "tcp://{0}:{1}".format( ip, port );                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
     message = { 'value': 10 };                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
     ##################################################################################
     context = zmq.Context();                                                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
     pass;              aReqSock = context.socket( zmq.REQ );                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ),                        "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################################################################################################################################################################
     pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     pass;              aReqSock.setsockopt(       zmq.LINGER,    0 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER    ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
     pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     ##################################################################################################################################################################################################################################
     pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     if SetImmediate:
                        aReqSock.setsockopt(       zmq.IMMEDIATE, 1 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
     pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     ##################################################################################################################################################################################################################################
     pass;              aReqSock.connect( addr );                                 print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ),                      "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################
     pass;        rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ),   "|", zmq.strerror( zmq.zmq_errno() ) )
     if      0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
         pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... ==  " ), rc )
         pass;          aReqSock.send_json( message,   flags = zmq.NOBLOCK )      # .send()-s dispatches message the REP-side may .recv() at some later time
         pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ),            "|", zmq.strerror( zmq.zmq_errno() ) )
         pass;    rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ),    "|", zmq.strerror( zmq.zmq_errno() ) )
         if  0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
             pass;                                                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
             response = aReqSock.recv_json()                                      # .recv() BLOCKS until ... if ever ...
             print( response );                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
     pass;                                                                        print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
     ##################################################################################
     rc = aReqSock.disconnect( addr );                                            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     rc = aReqSock.close(      linger = 0 );                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     rc = context.term();                                                         print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################

这产生了一些关于这个的东西:

>>> Test( SetImmediate = False )
____947107.0356056700_ACK: import-s: DONE... VER:  4.2.5
____947107.0356727780_ACK: ip SET...
____947107.0356969039_ACK: port SET...
____947107.0357236000_ACK: addr SET...
____947107.0357460320_ACK: message SET...
____947107.0358552620_ACK: Context INSTANTIATED... | Success
____947107.0362445670_ACK: Socket INSTANTIATED... | Success
____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947107.0367464600_ACK: rc was NON-ZERO... ==   2
____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
____947108.0382736750_ACK: if-ed code-block COMPLETED
____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
____947108.0384234400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
____947108.0386644470_ACK: Context.term() RETURNED CODE ~  None | Success

>>> Test( SetImmediate = True )
____947119.1267617550_ACK: import-s: DONE... VER:  4.2.5
____947119.1268189061_ACK: ip SET...
____947119.1268382660_ACK: port SET...
____947119.1268587380_ACK: addr SET...
____947119.1268772170_ACK: message SET...
____947119.1269678050_ACK: Context INSTANTIATED... | Success
____947119.1271884360_ACK: Socket INSTANTIATED... | Success
____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947120.1287937190_ACK: if-ed code-block COMPLETED
____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
____947120.1289412400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
____947120.1291404651_ACK: Context.term() RETURNED CODE ~  None | Success

证明假设不正确:context.term()没有问题,但是.connect( aTransportClass_Target )如何对抗正在内部处理不存在的目标。

令我惊讶的是,在测试版本 ( v4.2.5 ) 中 .poll( zmq.POLLOUT ) 报告 2 项目在 .POLLOUT-direction 已经存在于用户报告的 TxQueue-state 中,没有明确显示 .send()(因为 .poll().connect()).

在我看来,这与以前的版本有些不一致(好像它会尝试报告 .connect() 相关的 "protocol/identity" 遥测,而不是仅报告用户应用程序级别的消息).

虽然我在试图找出一些基本原理时可能是错误的,但为什么一个基本上为空的队列会尝试报告一条已经在其 .POLLOUT 方向内的消息,我希望已经充分证明,该问题与 Context()-instance 的 .LINGER == 0 / .term()-ination 无关.

Q.E.D.