0MQ:你能在超时后以 REQ/REP 模式丢弃消息吗?

0MQ: Can you drop a message after a timeout in a REQ/REP pattern?

在我的 0MQ 应用程序中,我通常这样做来处理超时:

import zmq

ctx = zmq.Context()

s = ctx.socket(zmq.DEALER)
s.connect("tcp://localhost:5555")

# send PING request
v = <some unique value>
s.send_multipart(["PING", v])
if s.poll(timeout * 1000) & zmq.POLLIN:
    msg = s.recv_multipart()
    ...

但是如果服务器不在运行,几分钟后上线,那么0MQ会自动重连并发送消息。 但是,如果我将 send PING 命令 放在一个循环中(每秒一次)并且 服务器已关闭,然后一旦服务器恢复在线,我将继续下一个 recv_multipart()-调用旧消息 在服务器离线时保留在内部 0MQ 队列中。因为我不在乎老 消息,虽然我可以这样做:


s = ctx.socket(zmq.DEALER)
s.connect("tcp://localhost:5555")

while True:
    # send PING request
    v = <some unique value>
    s.send_multipart(["PING", v])
    if s.poll(timeout * 1000) & zmq.POLLIN:
        msg = s.recv_multipart()
        ...
    else:
        s.close()
        s = ctx.socket(zmq.DEALER)
        s.connect("tcp://localhost:5555")

    time.sleep(1)

但这是个坏主意,过了一会儿 ctx.socket 提高了 ZMQError: Too many open files。 将 ZMQ_LINGER 套接字选项设置为 0 似乎有帮助 在这里,但现在我不喜欢这种策略,对我来说似乎是错误的。

所以理想情况下,如果读取超时,我想删除之前发送的消息 发生。这 a) 可能 b) 是个好主意吗?我不确定这个 虽然是正确的,但 0MQ 可能能够物理发送 消息,但服务器在发送回任何内容之前崩溃,因此丢弃 是不可能的,因为没有东西可以掉落,是吗?

所以我的问题是:遇到这种情况我该怎么办?我可能在看 这个问题角度不对?

Q : "Can you drop a message after a timeout in a REQ/REP pattern?"

没有

Q : Is this a) possible and b) a good idea at all?

a) 是的。
b) 是的。

Q : what should I do in this situation?

最好避免 REQ/REP 陷入相互僵局的确定性(你永远不知道它何时发生 - 这里有许多帖子有详细信息)+ 设置连接层以便提供自卫方式来交付只有健康连接上的最后一条消息:

...
s = ctx.socket( zmq.DEALER )

s.setsockopt( zmq.LINGER,    0 ) # ALWAYS, no excuse, you never know the peers' versions
s.setsockopt( zmq.IMMEDIATE, 1 ) # prevent sending over incomplete yet connections
s.setsockopt( zmq.CONFLATE,  1 ) # Does not support multi-part, so .pack() payload

s.connect( "tcp://localhost:5555" )
...