使用 PyZMQ 在异步和同步任务之间进行进程间通信
Inter-process communication between async and sync tasks using PyZMQ
在单个进程中,我有一个任务 运行ning 在一个产生值并广播它们的线程上,并且
运行 在 asyncio 循环中并发执行的多个消费者异步任务。
我发现this issue on PyZMQ's github询问异步<->同步通信
使用 inproc 套接字,这也是我想要的,答案是在 .shadow(ctx.underlying)
时使用
创建异步 ZMQ 上下文。
我准备了这个例子,看起来工作正常:
import signal
import asyncio
import zmq
import threading
import zmq.asyncio
import sys
import time
import json
def producer(ctrl):
# delay first push to give asyncio loop time
# to start
time.sleep(1)
ctx = ctrl["ctx"]
s = ctx.socket(zmq.PUB)
s.bind(ctrl["endpoint"])
v = 0
while ctrl["run"]:
payload = {"value": v, "timestamp": time.time()}
msg = json.dumps(payload).encode("utf-8")
s.send(msg)
v += 1
time.sleep(5)
print("Bye")
def main():
endpoint = "inproc://testendpoint"
ctx = zmq.Context()
actx = zmq.asyncio.Context.shadow(ctx.underlying)
ctrl = {"run": True, "ctx": ctx, "endpoint": endpoint, }
th = threading.Thread(target=producer, args=(ctrl,))
th.start()
try:
asyncio.run(amain(actx, endpoint))
except KeyboardInterrupt:
pass
print("Stopping thread")
ctrl["run"] = False
th.join()
async def amain(ctx, endpoint):
s = ctx.socket(zmq.SUB)
s.subscribe("")
s.connect(endpoint)
loop = asyncio.get_running_loop()
def stop():
try:
print("Closing zmq async socket")
s.close()
except:
pass
raise KeyboardInterrupt
loop.add_signal_handler(signal.SIGINT, stop)
while True:
event = await s.poll(1000)
if event & zmq.POLLIN:
msg = await s.recv()
payload = json.loads(msg.decode("utf-8"))
print("%f: %d" % (payload["timestamp"], payload["value"]))
if __name__ == "__main__":
sys.exit(main())
以这种方式在线程和 asyncio 任务之间使用 inproc://*
安全吗? 0MQ
context 是线程安全的,我不会在线程和
asyncio 任务,所以我一般会说这是线程安全的,对吧?还是我
缺少我应该考虑的东西?
Q :
Is it safe to use inproc://*
between a thread and asyncio task in this way?""
A :
首先,我可能大错特错(不仅在这里),但自原生 API 2.1 以来我一直在使用 ZeroMQ .1+ 我敢说,除非更新的“改进”失去了核心原则( ZeroMQ ZMTP/RFC-documented properties for building legal implementation of the still valid ZMTP-arsenal ),这里的答案应该是肯定的,就像pyzmq-binding 的较新版本毫无妥协地保留了 inproc:
-Transport-Class 的所有强制属性。
Q :
" The 0MQ context is thread safe and I'm not sharing sockets between the thread and the asyncio
task, so I would say in general that this is thread safe, right? "
A :
我的麻烦从这里开始——ZeroMQ 实现是基于 Martin SUSTRIK 和 Pieter HINTJENS 的 Zen-of-Zero 开发的——也就是as Zero-sharing——所以从不共享是原则(尽管“共享”-zmq.Context
-实例在不同线程中使用没有问题,与zmq.Socket
-实例相反)
Python(从那时起并且在 2022 年第一季度仍然有效)曾经使用并且仍然使用总共 [CONCURRENT]
-code-execution 个回避者——被 GIL-lock 阻止,这主要避免了由 [CONCURRENT]
-code-execution 引起的任何和所有类型的问题,这些问题永远不会发生内部 Python GIL-lock 重新 [SERIAL]
-ised 流程 code-execution,因此即使 asyncio
部分是作为生态系统的 pythonic (non-destructive) 部分构建的,您的代码也永远不会“遇到”任何类型的 concurrency-related 问题,因为除非它获得 GIL-lock,它只会“挂在 NOP
-s 破解中”(空闲循环中的 nuts-cracking)。
在同一个进程中,生成另一个 Context
实例似乎没有任何优势(这曾经是 rock-solid 的确定性,永远不会增加任何类型的开销- Zen-of-Zero (差不多)Zero-overhead ...)。如果需要性能或延迟需求,Sig/Msg 核心引擎会在实例化时提供更多 zmq.Context( IOthreads )
,但这些引擎是 zmq.Context
拥有的,而不是 Python-GIL-governed/(b) 锁定的线程,因此性能具有很好的可扩展性,不会浪费任何 RAM/HWM/buffers/...-资源,不会增加任何开销并且非常高效,因为 IO-threads 是 co-located 确实 I/O-work,所以 inproc:
-( protocol-less )-Transport-Class 根本不需要)
Q :
" Or am I missing something that I should consider? "
A :
Mixing asyncio
, O/S-signals(它们与本机 ZeroMQ 交互的方式有详细记录 API ) 和其他层次的复杂性肯定是可能的,但它是有代价的——它使 use-case 的可读性越来越差,并且越来越容易出现 conceptual-gaps 和类似的难以解码的“错误”。
我记得使用 Tkinter-mainloop()
作为一个 cost-wise 非常便宜的 super-stable 框架 rapid-prototyping MVC-{ M-model,V-isual,C-ontroller }-many-actors' 的部分实际上是 distributed-system 应用程序在 Python。有 Zerop-problems 将 ZeroMQ 与单个 Context
实例一起使用,将各个 AccessNodes 的引用传递到任意数量的 event-handlers,假设我们保留 ZeroMQ Zen-of-Zero,即不“共享”(意思是没有两个部分“使用”(竞争使用)同一个接入点“one-over-another”)
这一切都是 designed-in,在“Zero-cost”,由 ZeroMQ by-definition,所以除非在后面的某个阶段被破坏,re-wrapping 一个 re-wrapped 原生 API,这一切应该在 2022-Q1 仍然有效,不是吗?
在单个进程中,我有一个任务 运行ning 在一个产生值并广播它们的线程上,并且 运行 在 asyncio 循环中并发执行的多个消费者异步任务。
我发现this issue on PyZMQ's github询问异步<->同步通信
使用 inproc 套接字,这也是我想要的,答案是在 .shadow(ctx.underlying)
时使用
创建异步 ZMQ 上下文。
我准备了这个例子,看起来工作正常:
import signal
import asyncio
import zmq
import threading
import zmq.asyncio
import sys
import time
import json
def producer(ctrl):
# delay first push to give asyncio loop time
# to start
time.sleep(1)
ctx = ctrl["ctx"]
s = ctx.socket(zmq.PUB)
s.bind(ctrl["endpoint"])
v = 0
while ctrl["run"]:
payload = {"value": v, "timestamp": time.time()}
msg = json.dumps(payload).encode("utf-8")
s.send(msg)
v += 1
time.sleep(5)
print("Bye")
def main():
endpoint = "inproc://testendpoint"
ctx = zmq.Context()
actx = zmq.asyncio.Context.shadow(ctx.underlying)
ctrl = {"run": True, "ctx": ctx, "endpoint": endpoint, }
th = threading.Thread(target=producer, args=(ctrl,))
th.start()
try:
asyncio.run(amain(actx, endpoint))
except KeyboardInterrupt:
pass
print("Stopping thread")
ctrl["run"] = False
th.join()
async def amain(ctx, endpoint):
s = ctx.socket(zmq.SUB)
s.subscribe("")
s.connect(endpoint)
loop = asyncio.get_running_loop()
def stop():
try:
print("Closing zmq async socket")
s.close()
except:
pass
raise KeyboardInterrupt
loop.add_signal_handler(signal.SIGINT, stop)
while True:
event = await s.poll(1000)
if event & zmq.POLLIN:
msg = await s.recv()
payload = json.loads(msg.decode("utf-8"))
print("%f: %d" % (payload["timestamp"], payload["value"]))
if __name__ == "__main__":
sys.exit(main())
以这种方式在线程和 asyncio 任务之间使用 inproc://*
安全吗? 0MQ
context 是线程安全的,我不会在线程和
asyncio 任务,所以我一般会说这是线程安全的,对吧?还是我
缺少我应该考虑的东西?
Q :
Is it safe to useinproc://*
between a thread and asyncio task in this way?""
A :
首先,我可能大错特错(不仅在这里),但自原生 API 2.1 以来我一直在使用 ZeroMQ .1+ 我敢说,除非更新的“改进”失去了核心原则( ZeroMQ ZMTP/RFC-documented properties for building legal implementation of the still valid ZMTP-arsenal ),这里的答案应该是肯定的,就像pyzmq-binding 的较新版本毫无妥协地保留了 inproc:
-Transport-Class 的所有强制属性。
Q :
" The 0MQ context is thread safe and I'm not sharing sockets between the thread and theasyncio
task, so I would say in general that this is thread safe, right? "
A :
我的麻烦从这里开始——ZeroMQ 实现是基于 Martin SUSTRIK 和 Pieter HINTJENS 的 Zen-of-Zero 开发的——也就是as Zero-sharing——所以从不共享是原则(尽管“共享”-zmq.Context
-实例在不同线程中使用没有问题,与zmq.Socket
-实例相反)
Python(从那时起并且在 2022 年第一季度仍然有效)曾经使用并且仍然使用总共 [CONCURRENT]
-code-execution 个回避者——被 GIL-lock 阻止,这主要避免了由 [CONCURRENT]
-code-execution 引起的任何和所有类型的问题,这些问题永远不会发生内部 Python GIL-lock 重新 [SERIAL]
-ised 流程 code-execution,因此即使 asyncio
部分是作为生态系统的 pythonic (non-destructive) 部分构建的,您的代码也永远不会“遇到”任何类型的 concurrency-related 问题,因为除非它获得 GIL-lock,它只会“挂在 NOP
-s 破解中”(空闲循环中的 nuts-cracking)。
在同一个进程中,生成另一个 Context
实例似乎没有任何优势(这曾经是 rock-solid 的确定性,永远不会增加任何类型的开销- Zen-of-Zero (差不多)Zero-overhead ...)。如果需要性能或延迟需求,Sig/Msg 核心引擎会在实例化时提供更多 zmq.Context( IOthreads )
,但这些引擎是 zmq.Context
拥有的,而不是 Python-GIL-governed/(b) 锁定的线程,因此性能具有很好的可扩展性,不会浪费任何 RAM/HWM/buffers/...-资源,不会增加任何开销并且非常高效,因为 IO-threads 是 co-located 确实 I/O-work,所以 inproc:
-( protocol-less )-Transport-Class 根本不需要)
Q :
" Or am I missing something that I should consider? "
A :
Mixing asyncio
, O/S-signals(它们与本机 ZeroMQ 交互的方式有详细记录 API ) 和其他层次的复杂性肯定是可能的,但它是有代价的——它使 use-case 的可读性越来越差,并且越来越容易出现 conceptual-gaps 和类似的难以解码的“错误”。
我记得使用 Tkinter-mainloop()
作为一个 cost-wise 非常便宜的 super-stable 框架 rapid-prototyping MVC-{ M-model,V-isual,C-ontroller }-many-actors' 的部分实际上是 distributed-system 应用程序在 Python。有 Zerop-problems 将 ZeroMQ 与单个 Context
实例一起使用,将各个 AccessNodes 的引用传递到任意数量的 event-handlers,假设我们保留 ZeroMQ Zen-of-Zero,即不“共享”(意思是没有两个部分“使用”(竞争使用)同一个接入点“one-over-another”)
这一切都是 designed-in,在“Zero-cost”,由 ZeroMQ by-definition,所以除非在后面的某个阶段被破坏,re-wrapping 一个 re-wrapped 原生 API,这一切应该在 2022-Q1 仍然有效,不是吗?