订阅者接收消息缓慢

Subscribers receive messages slowly

我有一个 pyzmq 发布器,它每秒发送大约 1000 条消息。我正在尝试在 asyncio event_loop 中启动大约 10 个订阅者。

它可以工作,但比唯一一个订阅者的速度慢 2.5 倍左右。

代码可能有什么问题?

import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop


class Client:
    REQUEST_TIMEOUT = 35000
    SERVER_ENDPOINT = "tcp://localhost:6666"

    def __init__(self, id_):
        self.id = id_

    def get_task(self):
        return asyncio.create_task(self.client_coroutine())

    async def client_coroutine(self):
        context = Context.instance()

        socket = context.socket(zmq.SUB)
        socket.connect(self.SERVER_ENDPOINT)
        socket.setsockopt(zmq.SUBSCRIBE, b'4')
        poller = Poller()
        poller.register(socket, zmq.POLLIN)

        while True:
            event = dict(await poller.poll(self.REQUEST_TIMEOUT))
            if event.get(socket) == zmq.POLLIN:
                reply = await socket.recv_multipart(flags=NOBLOCK)
                if not reply:
                    break
                else:
                    print(eval(json.loads(reply[1].decode('utf-8'))))
            else:
                print("No response from server, retrying...")
                socket.setsockopt(zmq.LINGER, 0)
                socket.close()
                poller.unregister(socket)

async def tasks():
    _tasks = [Client(id_).get_task() for id_ in range(10)]
    done, pending = await asyncio.wait(_tasks, return_when=asyncio.FIRST_EXCEPTION)


loop = asyncio.get_event_loop()
loop.run_until_complete(tasks())

Q : What could possibly be wrong with the code?

鉴于代码使用相同的 localhost(从使用地址可以看出),第一个可疑点是,有 10 倍以上的工作要处理,这样的工作量总是会给 localhost 的 O/S 和 CPU 带来压力,不是吗?

接下来是传输方式的选择-class。鉴于所有 SUB-s 都位于与 [ 相同的 localhost 上=14=],所有基于 L3 堆栈的 TCP/IP 协议工作都被浪费了。要比较相对成本(使用 tcp:// transport-class 进行此硬件单一消息传递的附加效果),测试与使用相同inproc:// transport-class,其中 none 协议相关的 TCP/IP-stack 附加处理将发生。

最后但同样重要的是,我的代码永远不会混合不同的事件循环(从 v2.11 开始使用 ZeroMQ,所以有人可能会认为我在避免依赖 async 装饰功能方面有点过时在最近的 py3.6+ 中可用 )

我的代码将使用显式的、非阻塞的、零等待的测试来检测每个aSocketINSTANCE的消息是否存在,如aSocketINSTANCE.poll( zmq.POLLIN, 0 ) 而不是使用任何 "externally" 添加的装饰,它可能报告相同,但通过一些额外的(昂贵且在我的代码控制域之外)事件处理。所有实时、低延迟的用例都力求尽可能减少 latency/overheads,因此在我的项目中使用显式控制总是会胜出,任何 "modern" 语法糖技巧。

无论如何,享受零之禅