订阅者接收消息缓慢
Subscribers receive messages slowly
我有一个 pyzmq 发布器,它每秒发送大约 1000 条消息。我正在尝试在 asyncio event_loop 中启动大约 10 个订阅者。
它可以工作,但比唯一一个订阅者的速度慢 2.5 倍左右。
代码可能有什么问题?
import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop
class Client:
REQUEST_TIMEOUT = 35000
SERVER_ENDPOINT = "tcp://localhost:6666"
def __init__(self, id_):
self.id = id_
def get_task(self):
return asyncio.create_task(self.client_coroutine())
async def client_coroutine(self):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(self.SERVER_ENDPOINT)
socket.setsockopt(zmq.SUBSCRIBE, b'4')
poller = Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = dict(await poller.poll(self.REQUEST_TIMEOUT))
if event.get(socket) == zmq.POLLIN:
reply = await socket.recv_multipart(flags=NOBLOCK)
if not reply:
break
else:
print(eval(json.loads(reply[1].decode('utf-8'))))
else:
print("No response from server, retrying...")
socket.setsockopt(zmq.LINGER, 0)
socket.close()
poller.unregister(socket)
async def tasks():
_tasks = [Client(id_).get_task() for id_ in range(10)]
done, pending = await asyncio.wait(_tasks, return_when=asyncio.FIRST_EXCEPTION)
loop = asyncio.get_event_loop()
loop.run_until_complete(tasks())
Q : What could possibly be wrong with the code?
鉴于代码使用相同的 localhost
(从使用地址可以看出),第一个可疑点是,有 10 倍以上的工作要处理,这样的工作量总是会给 localhost
的 O/S 和 CPU 带来压力,不是吗?
接下来是传输方式的选择-class。鉴于所有 SUB
-s 都位于与 [ 相同的 localhost
上=14=],所有基于 L3 堆栈的 TCP/IP 协议工作都被浪费了。要比较相对成本(使用 tcp://
transport-class 进行此硬件单一消息传递的附加效果),测试与使用相同inproc://
transport-class,其中 none 协议相关的 TCP/IP-stack 附加处理将发生。
最后但同样重要的是,我的代码永远不会混合不同的事件循环(从 v2.11 开始使用 ZeroMQ,所以有人可能会认为我在避免依赖 async
装饰功能方面有点过时在最近的 py3.6+ 中可用 )
我的代码将使用显式的、非阻塞的、零等待的测试来检测每个aSocketINSTANCE
的消息是否存在,如aSocketINSTANCE.poll( zmq.POLLIN, 0 )
而不是使用任何 "externally" 添加的装饰,它可能报告相同,但通过一些额外的(昂贵且在我的代码控制域之外)事件处理。所有实时、低延迟的用例都力求尽可能减少 latency/overheads,因此在我的项目中使用显式控制总是会胜出,任何 "modern" 语法糖技巧。
无论如何,享受零之禅
我有一个 pyzmq 发布器,它每秒发送大约 1000 条消息。我正在尝试在 asyncio event_loop 中启动大约 10 个订阅者。
它可以工作,但比唯一一个订阅者的速度慢 2.5 倍左右。
代码可能有什么问题?
import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop
class Client:
REQUEST_TIMEOUT = 35000
SERVER_ENDPOINT = "tcp://localhost:6666"
def __init__(self, id_):
self.id = id_
def get_task(self):
return asyncio.create_task(self.client_coroutine())
async def client_coroutine(self):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(self.SERVER_ENDPOINT)
socket.setsockopt(zmq.SUBSCRIBE, b'4')
poller = Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = dict(await poller.poll(self.REQUEST_TIMEOUT))
if event.get(socket) == zmq.POLLIN:
reply = await socket.recv_multipart(flags=NOBLOCK)
if not reply:
break
else:
print(eval(json.loads(reply[1].decode('utf-8'))))
else:
print("No response from server, retrying...")
socket.setsockopt(zmq.LINGER, 0)
socket.close()
poller.unregister(socket)
async def tasks():
_tasks = [Client(id_).get_task() for id_ in range(10)]
done, pending = await asyncio.wait(_tasks, return_when=asyncio.FIRST_EXCEPTION)
loop = asyncio.get_event_loop()
loop.run_until_complete(tasks())
Q : What could possibly be wrong with the code?
鉴于代码使用相同的 localhost
(从使用地址可以看出),第一个可疑点是,有 10 倍以上的工作要处理,这样的工作量总是会给 localhost
的 O/S 和 CPU 带来压力,不是吗?
接下来是传输方式的选择-class。鉴于所有 SUB
-s 都位于与 [ 相同的 localhost
上=14=],所有基于 L3 堆栈的 TCP/IP 协议工作都被浪费了。要比较相对成本(使用 tcp://
transport-class 进行此硬件单一消息传递的附加效果),测试与使用相同inproc://
transport-class,其中 none 协议相关的 TCP/IP-stack 附加处理将发生。
最后但同样重要的是,我的代码永远不会混合不同的事件循环(从 v2.11 开始使用 ZeroMQ,所以有人可能会认为我在避免依赖 async
装饰功能方面有点过时在最近的 py3.6+ 中可用 )
我的代码将使用显式的、非阻塞的、零等待的测试来检测每个aSocketINSTANCE
的消息是否存在,如aSocketINSTANCE.poll( zmq.POLLIN, 0 )
而不是使用任何 "externally" 添加的装饰,它可能报告相同,但通过一些额外的(昂贵且在我的代码控制域之外)事件处理。所有实时、低延迟的用例都力求尽可能减少 latency/overheads,因此在我的项目中使用显式控制总是会胜出,任何 "modern" 语法糖技巧。
无论如何,享受零之禅