pynng:如何在 REP0 套接字上设置并继续使用多个上下文

pynng: how to setup, and keep using, multiple Contexts on a REP0 socket

我正在开发一个 "server" 线程,它负责一些 "clients".

的 IO 调用

通信是使用 pynng v0.5.0, the server has its own asyncio 循环完成的。

每个客户端 "registers" 通过发送第一个请求,然后循环接收结果并发回 READY 消息。

在服务器上,目标是将每个客户端的第一条消息视为注册请求,并创建一个专用的工作任务,该任务将循环执行 IO 操作、发送结果并等待该客户端的 READY 消息特定客户。

为了实现这一点,我尝试利用 REP0 套接字的 Context 特性。

旁注

问题:

经过几次迭代后,一些 READY 消息被服务器的注册协程拦截,而不是被路由到适当的工作任务。

由于我无法分享代码,我为我的问题写了一个复制器并将其包含在下面。

更糟糕的是,正如您在输出中看到的那样,一些结果消息被发送到错误的客户端 (ERROR:root:<Worker 1>: worker/client mismatch, exiting.)。

它看起来像一个错误,但我不完全确定我是否理解如何正确使用上下文,因此我们将不胜感激。

环境:

代码:

import asyncio
import logging
import pynng
import threading

NNG_DURATION_INFINITE = -1
ENDPOINT = 'inproc://example_endpoint'


class Server(threading.Thread):
    def __init__(self):
        super(Server, self).__init__()
        self._client_tasks = dict()

    @staticmethod
    async def _worker(ctx, client_id):
        while True:
            # Remember, the first 'receive' has already been done by self._new_client_handler()

            logging.debug(f"<Worker {client_id}>: doing some IO")
            await asyncio.sleep(1)

            logging.debug(f"<Worker {client_id}>: sending the result")
            # I already tried sending synchronously here instead, just in case the issue was related to that
            # (but it's not)
            await ctx.asend(f"result data for client {client_id}".encode())

            logging.debug(f"<Worker {client_id}>: waiting for client READY msg")
            data = await ctx.arecv()
            logging.debug(f"<Worker {client_id}>: received '{data}'")
            if data != bytes([client_id]):
                logging.error(f"<Worker {client_id}>: worker/client mismatch, exiting.")
                return

    async def _new_client_handler(self):
        with pynng.Rep0(listen=ENDPOINT) as socket:
            max_workers = 3 + 1  # Try setting it to 3 instead, to stop creating new contexts => now it works fine
            while await asyncio.sleep(0, result=True) and len(self._client_tasks) < max_workers:
                # The issue is here: at some point, the existing client READY messages get
                # intercepted here, instead of being routed to the proper worker context.
                # The intent here was to open a new context only for each *new* client, I was
                # assuming that a 'recv' on older worker contexts would take precedence.
                ctx = socket.new_context()
                data = await ctx.arecv()
                client_id = data[0]

                if client_id in self._client_tasks:
                    logging.error(f"<Server>: We already have a task for client {client_id}")
                    continue  # just let the client block on its 'recv' for now

                logging.debug(f"<Server>: New client : {client_id}")
                self._client_tasks[client_id] = asyncio.create_task(self._worker(ctx, client_id))

            await asyncio.gather(*list(self._client_tasks.values()))

    def run(self) -> None:
        # The "server" thread has its own asyncio loop
        asyncio.run(self._new_client_handler(), debug=True)


class Client(threading.Thread):
    def __init__(self, client_id: int):
        super(Client, self).__init__()
        self._id = client_id

    def __repr__(self):
        return f'<Client {self._id}>'

    def run(self):
        with pynng.Req0(dial=ENDPOINT, resend_time=NNG_DURATION_INFINITE) as socket:
            while True:
                logging.debug(f"{self}: READY")
                socket.send(bytes([self._id]))
                data_str = socket.recv().decode()
                logging.debug(f"{self}: received '{data_str}'")
                if data_str != f"result data for client {self._id}":
                    logging.error(f"{self}: client/worker mismatch, exiting.")
                    return


def main():
    logging.basicConfig(level=logging.DEBUG)
    threads = [Server(),
               *[Client(i) for i in range(3)]]
    for t in threads:
        t.start()
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

输出:

DEBUG:asyncio:Using proactor: IocpProactor
DEBUG:root:<Client 1>: READY
DEBUG:root:<Client 0>: READY
DEBUG:root:<Client 2>: READY
DEBUG:root:<Server>: New client : 1
DEBUG:root:<Worker 1>: doing some IO
DEBUG:root:<Server>: New client : 0
DEBUG:root:<Worker 0>: doing some IO
DEBUG:root:<Server>: New client : 2
DEBUG:root:<Worker 2>: doing some IO
DEBUG:root:<Worker 1>: sending the result
DEBUG:root:<Client 1>: received 'result data for client 1'
DEBUG:root:<Client 1>: READY
ERROR:root:<Server>: We already have a task for client 1
DEBUG:root:<Worker 1>: waiting for client READY msg
DEBUG:root:<Worker 0>: sending the result
DEBUG:root:<Client 0>: received 'result data for client 0'
DEBUG:root:<Client 0>: READY
DEBUG:root:<Worker 0>: waiting for client READY msg
DEBUG:root:<Worker 1>: received 'b'\x00''
ERROR:root:<Worker 1>: worker/client mismatch, exiting.
DEBUG:root:<Worker 2>: sending the result
DEBUG:root:<Client 2>: received 'result data for client 2'
DEBUG:root:<Client 2>: READY
DEBUG:root:<Worker 2>: waiting for client READY msg
ERROR:root:<Server>: We already have a task for client 2

编辑 (2020-04-10): 将 pynng 和底层 nng.lib 更新到最新版本(master 分支),仍然是同样的问题。

在深入了解 nng 和 pynng 的源代码并与维护者确认我的理解后,我现在可以回答我自己的问题了。

在 REP0 套接字上使用上下文时,需要注意一些事项。

正如所宣传的那样,send/asend() 保证路由到您上次接收的同一个对等点。

然而,在同一上下文中来自下一个 recv/arecv() 的数据不能保证来自同一对等方。

实际上,对 rep0_ctx_recv() 的底层 nng 调用仅读取具有可用数据的下一个套接字管道,因此不能保证所述数据来自与最后一个 recv/send 对相同的对等点。

在上面的复制器中,我在新上下文(在 Server._new_client_handler() 协程中)和每个工作上下文(在Server._worker() 协程)。

所以我之前描述的主协程的下一个请求 "intercepted" 只是一个竞争条件。

一种解决方案是只从 Server._new_client_handler() 协同程序接收,并且让工作人员只处理一个请求。请注意,在这种情况下,工作人员不再专用于特定的对等方。如果需要此行为,传入请求的路由必须在应用程序级别处理。

class Server(threading.Thread):
    @staticmethod
    async def _worker(ctx, data: bytes):
        client_id = int.from_bytes(data, byteorder='big', signed=False)
        logging.debug(f"<Worker {client_id}>: doing some IO")
        await asyncio.sleep(1 + 10 * random.random())

        logging.debug(f"<Worker {client_id}>: sending the result")
        await ctx.asend(f"result data for client {client_id}".encode())

    async def _new_client_handler(self):
        with pynng.Rep0(listen=ENDPOINT) as socket:
            while await asyncio.sleep(0, result=True):
                ctx = socket.new_context()
                data = await ctx.arecv()
                asyncio.create_task(self._worker(ctx, data))

    def run(self) -> None:
        # The "server" thread has its own asyncio loop
        asyncio.run(self._new_client_handler(), debug=False)