使用 PyZMQ 在异步和同步任务之间进行进程间通信

Inter-process communication between async and sync tasks using PyZMQ

在单个进程中,我有一个任务 运行ning 在一个产生值并广播它们的线程上,并且 运行 在 asyncio 循环中并发执行的多个消费者异步任务。

我发现this issue on PyZMQ's github询问异步<->同步通信 使用 inproc 套接字,这也是我想要的,答案是在 .shadow(ctx.underlying) 时使用 创建异步 ZMQ 上下文。

我准备了这个例子,看起来工作正常:

import signal
import asyncio
import zmq
import threading
import zmq.asyncio
import sys
import time
import json


def producer(ctrl):
    # delay first push to give asyncio loop time
    # to start
    time.sleep(1)

    ctx = ctrl["ctx"]

    s = ctx.socket(zmq.PUB)

    s.bind(ctrl["endpoint"])

    v = 0
    while ctrl["run"]:
        payload = {"value": v, "timestamp": time.time()}

        msg = json.dumps(payload).encode("utf-8")

        s.send(msg)
        v += 1
        time.sleep(5)

    print("Bye")


def main():
    endpoint = "inproc://testendpoint"
    ctx = zmq.Context()
    actx = zmq.asyncio.Context.shadow(ctx.underlying)

    ctrl = {"run": True, "ctx": ctx, "endpoint": endpoint, }

    th = threading.Thread(target=producer, args=(ctrl,))
    th.start()

    try:
        asyncio.run(amain(actx, endpoint))
    except KeyboardInterrupt:
        pass

    print("Stopping thread")
    ctrl["run"] = False
    th.join()


async def amain(ctx, endpoint):
    s = ctx.socket(zmq.SUB)
    s.subscribe("")
    s.connect(endpoint)

    loop = asyncio.get_running_loop()

    def stop():
        try:
            print("Closing zmq async socket")
            s.close()
        except:
            pass

        raise KeyboardInterrupt

    loop.add_signal_handler(signal.SIGINT, stop)

    while True:
        event = await s.poll(1000)
        if event & zmq.POLLIN:
            msg = await s.recv()
            payload = json.loads(msg.decode("utf-8"))

            print("%f: %d" % (payload["timestamp"], payload["value"]))


if __name__ == "__main__":
    sys.exit(main())

以这种方式在线程和 asyncio 任务之间使用 inproc://* 安全吗? 0MQ context 是线程安全的,我不会在线程和 asyncio 任务,所以我一般会说这是线程安全的,对吧?还是我 缺少我应该考虑的东西?

Q :
Is it safe to use inproc://* between a thread and asyncio task in this way?""

A :
首先,我可能大错特错(不仅在这里),但自原生 API 2.1 以来我一直在使用 ZeroMQ .1+ 我敢说,除非更新的“改进”失去了核心原则( ZeroMQ ZMTP/RFC-documented properties for building legal implementation of the still valid ZMTP-arsenal ),这里的答案应该是肯定的,就像pyzmq-binding 的较新版本毫无妥协地保留了 inproc:-Transport-Class 的所有强制属性。

Q :
" The 0MQ context is thread safe and I'm not sharing sockets between the thread and the asyncio task, so I would say in general that this is thread safe, right? "

A :
我的麻烦从这里开始——ZeroMQ 实现是基于 Martin SUSTRIK 和 Pieter HINTJENS 的 Zen-of-Zero 开发的——也就是as Zero-sharing——所以从不共享是原则(尽管“共享”-zmq.Context-实例在不同线程中使用没有问题,与zmq.Socket-实例相反)

Python(从那时起并且在 2022 年第一季度仍然有效)曾经使用并且仍然使用总共 [CONCURRENT]-code-execution 个回避者——被 GIL-lock 阻止,这主要避免了由 [CONCURRENT]-code-execution 引起的任何和所有类型的问题,这些问题永远不会发生内部 Python GIL-lock 重新 [SERIAL]-ised 流程 code-execution,因此即使 asyncio 部分是作为生态系统的 pythonic (non-destructive) 部分构建的,您的代码也永远不会“遇到”任何类型的 concurrency-related 问题,因为除非它获得 GIL-lock,它只会“挂在 NOP-s 破解中”(空闲循环中的 nuts-cracking)。

在同一个进程中,生成另一个 Context 实例似乎没有任何优势(这曾经是 rock-solid 的确定性,永远不会增加任何类型的开销- Zen-of-Zero (差不多)Zero-overhead ...)。如果需要性能或延迟需求,Sig/Msg 核心引擎会在实例化时提供更多 zmq.Context( IOthreads ),但这些引擎是 zmq.Context 拥有的,而不是 Python-GIL-governed/(b) 锁定的线程,因此性能具有很好的可扩展性,不会浪费任何 RAM/HWM/buffers/...-资源,不会增加任何开销并且非常高效,因为 IO-threads 是 co-located 确实 I/O-work,所以 inproc:-( protocol-less )-Transport-Class 根本不需要)

Q :
" Or am I missing something that I should consider? "

A :
Mixing asyncio, O/S-signals(它们与本机 ZeroMQ 交互的方式有详细记录 API ) 和其他层次的复杂性肯定是可能的,但它是有代价的——它使 use-case 的可读性越来越差,并且越来越容易出现 conceptual-gaps 和类似的难以解码的“错误”。

我记得使用 Tkinter-mainloop() 作为一个 cost-wise 非常便宜的 super-stable 框架 rapid-prototyping MVC-{ M-model,V-isual,C-ontroller }-many-actors' 的部分实际上是 应用程序在 Python。有 Zerop-problems 将 ZeroMQ 与单个 Context 实例一起使用,将各个 AccessNodes 的引用传递到任意数量的 event-handlers,假设我们保留 ZeroMQ Zen-of-Zero,即不“共享”(意思是没有两个部分“使用”(竞争使用)同一个接入点“one-over-another”)

这一切都是 designed-in,在“Zero-cost”,由 ZeroMQ by-definition,所以除非在后面的某个阶段被破坏,re-wrapping 一个 re-wrapped 原生 API,这一切应该在 2022-Q1 仍然有效,不是吗?