zmq 连接到使用子进程打开的脚本

zmq connection to a script opened with subprocess

我有一个相当复杂的基于套接字(ZeroMQ - REQ/REP)的 python 程序,我想验证它是否由 运行 运行同一台机器上的简单套接字脚本。

测试脚本就是这样的。

import subprocess
import zmq
import json

# ...

for call, response in zip(test_calls, expected_responses):
    p = subprocess.Popen(['python', 'main.py'], stdout=subprocess.PIPE)

    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.setsockopt(zmq.RCVTIMEO, 1000)
    socket.connect("tcp://localhost:8084")

    socket.send_string(json.dumps(call))
    r = json.loads(socket.recv_string())

    assert r == response

    p.terminate()
    socket.close()

(可能值得注意,它实际上是在 nose2 中使用这样的测试实现的,但我觉得这超出了这个问题的范围并且确实会使示例复杂化。这几乎总结了测试中发生的事情)。

85% 的情况下,这会奏效,一切都会过去。哇哦!另外 15% 的时间,我在 r = json.loads(socket.recv_string()) 行得到一个 zmq.error.Again: Resource temporarily unavailable(如果我没有设置 zmq.RCVTIMEO,它就会挂起)。

想知道这是否是一个计时过程(子过程不能及时 start/stop)我在这个地方打了几个 time.sleep() 电话,但它似乎没有做任何事情。

我在套接字部分之后放了一个 catch 并 pdb 进去了,检查 python 子进程的 stdout。我在应用程序中有一些 print 语句打印到 stdout 通过套接字进行的每次调用和响应,但它没有收到任何输入,所以 recv 当然会超时。

我以前从未遇到过 zmq 的此类问题,所以我认为它可能与这样的子进程的使用有关。有谁知道问题可能出在哪里以及如何解决?

谢谢。


更新: 所以看起来进程没有终止(尽管在主应用程序中使用了 signal.signal(signal.SIGTERM, close_app) 信号)。这会导致对通过 zmq 进行通信的活动进程产生混淆吗?最初调用 p.kill() 而不是 p.terminate() 似乎可以解决问题,尽管它仍然以同样的方式失败了一两次。


更新 2: 似乎有效的东西是直接调用命令 kill

subprocess.call(['kill', str(p.pid)])
counter = 0
while p.poll() is None:
    time.sleep(0.1)
    counter += 1
    if counter > 20:
        p.kill()

在大多数情况下似乎可以优雅地关闭它。

可能是什么问题

可能与所述子流程中未 posted 的代码有关,这导致观察到的行为在子流程强制终止期间启动(包括它的其他资源,在智能和管理中进行管理)功能非常丰富的多线程 zmq.Context( n_IO_threads = 1 ) 实例,在您的视线之外并且仅在有限的先验编码/执行控件内)。

考虑{SIGTERM|SIGKILL|...}而不是紧急制动,
紧急按钮,
在分布式系统设计中不是明智的解决方案

一旦进入分布式系统设计,人们宁愿忘记使用类似 SIGTERM 等的无上下文工具,而是最好将自己的软信号控制平面合并到新的系统中设计的分布式系统基础架构。

这有助于 "remote"-agent 的行为与此类软信号的实际上下文一致,并允许执行(在您的完全算法控制下)所有必要的安全保护、资源清理和预终止职责,以便最终优雅地清理退出。

我在这方面听起来可能有点过时,但在您的代码最终指示所有 zmq.Context() 实例到 .term()。据报道这不是必需的,但是干净和公平地进行资源处理是恕我直言,分布式系统设计/实现中的公平职责。

没有例外,没有借口。

ZMQ_LINGER

中忘记了该死的零

一个值得一提的例子是ZeroMQ API参数的默认值ZMQ_LINGER,如果没有设置,它的默认值为 0 这意味着,一旦这样的 ZeroMQ-socket 实例被指示(显式或隐式)到 .close() 并且碰巧还有一个 ZMQ_LINGER == 0,套接字端点将 BLOCK 直到来自交易对手的所有消息缓冲区被传递,这可能会导致您的分布式处理挂起而没有任何机会解决这种死锁 ex-post,如果没有被正确地预先设置,则不要永远等待未决消息。

较新的 pyzmq 文档明确警告不要 .destroy() 一个 zmq.Context 实例(并盲目地让权威发布的 [=25= 获取套接字 .close()-d ], 这是自己代码控制之外的)

ctx.destroy( linger = None )
Close all sockets associated with this context, and then terminate the context. If linger is specified, the LINGER sockopt of the sockets will be set prior to closing.

Warning

.destroy involves calling zmq_close(), which is NOT threadsafe. If there are active sockets in other threads, this must not be called ( which advice, most probably, the SIGTERM & al will ignore, wouldn't it? )

所以还有更多理由不依赖 SIGTERM 魔鬼的服务。

正在使用的端口

另外,释放占用的transport-class资源需要一定的时间。因此,拥有刚刚发布 IP:port 的代码并不意味着另一个实例/进程/线程可能会正确跳入并抢占同一个端口,而无需 O/S-related 延迟。而是在这方面也检查你的资源重用/释放策略(我敢冒在这方面阻塞的风险并使用一些端口地址池来轮换和 FILO-排队,以便至少推迟任何潜在的重用案例,直到合理的 O/S-related 延迟已经过期 - 恕我直言,防止阻塞状态比 ex-post 处理阻塞状态的异常)。

.bind().connect()

之前

是另一个这样的问题。一旦您的 subprocess.Popen(...) 启动,O/S 服务启动并使子进程开始自行呼吸需要一些时间。

如果您的第一个进程,已经激活并正在执行,已经达到 .connect(),在派生的子进程实例到达它 .bind() 之前,分布式系统将会阻塞。

Setup/dismantle 往返时间不能减少到零。资源不是一次性的。使用它会产生一些与系统相关的维护和共享开销。

最后 .recv_string() 可能并且确实提出 ZMQError EAGAIN

在某些情况下,本地节点中还没有任何消息准备好通过任何 .recv*() 方法检索它,无论是 {.recv|.recv_string|.recv_json|&al}flags = zmq.NOBLOCK 模式下。