pynng:如何在 REP0 套接字上设置并继续使用多个上下文
pynng: how to setup, and keep using, multiple Contexts on a REP0 socket
我正在开发一个 "server" 线程,它负责一些 "clients".
的 IO 调用
通信是使用 pynng v0.5.0, the server has its own asyncio 循环完成的。
每个客户端 "registers" 通过发送第一个请求,然后循环接收结果并发回 READY 消息。
在服务器上,目标是将每个客户端的第一条消息视为注册请求,并创建一个专用的工作任务,该任务将循环执行 IO 操作、发送结果并等待该客户端的 READY 消息特定客户。
为了实现这一点,我尝试利用 REP0 套接字的 Context 特性。
旁注
虽然我是这个网站的忠实用户,但这是我的第一个问题:)
我确实知道 PUB/SUB 模式,只是为了自学目的,我选择不将它用于此服务。
问题:
经过几次迭代后,一些 READY 消息被服务器的注册协程拦截,而不是被路由到适当的工作任务。
由于我无法分享代码,我为我的问题写了一个复制器并将其包含在下面。
更糟糕的是,正如您在输出中看到的那样,一些结果消息被发送到错误的客户端 (ERROR:root:<Worker 1>: worker/client mismatch, exiting.
)。
它看起来像一个错误,但我不完全确定我是否理解如何正确使用上下文,因此我们将不胜感激。
环境:
- winpython-3.8.2
- pynng v0.5.0+dev (46fbbcb2), with nng v1.3.0 (ff99ee51)
代码:
import asyncio
import logging
import pynng
import threading
NNG_DURATION_INFINITE = -1
ENDPOINT = 'inproc://example_endpoint'
class Server(threading.Thread):
def __init__(self):
super(Server, self).__init__()
self._client_tasks = dict()
@staticmethod
async def _worker(ctx, client_id):
while True:
# Remember, the first 'receive' has already been done by self._new_client_handler()
logging.debug(f"<Worker {client_id}>: doing some IO")
await asyncio.sleep(1)
logging.debug(f"<Worker {client_id}>: sending the result")
# I already tried sending synchronously here instead, just in case the issue was related to that
# (but it's not)
await ctx.asend(f"result data for client {client_id}".encode())
logging.debug(f"<Worker {client_id}>: waiting for client READY msg")
data = await ctx.arecv()
logging.debug(f"<Worker {client_id}>: received '{data}'")
if data != bytes([client_id]):
logging.error(f"<Worker {client_id}>: worker/client mismatch, exiting.")
return
async def _new_client_handler(self):
with pynng.Rep0(listen=ENDPOINT) as socket:
max_workers = 3 + 1 # Try setting it to 3 instead, to stop creating new contexts => now it works fine
while await asyncio.sleep(0, result=True) and len(self._client_tasks) < max_workers:
# The issue is here: at some point, the existing client READY messages get
# intercepted here, instead of being routed to the proper worker context.
# The intent here was to open a new context only for each *new* client, I was
# assuming that a 'recv' on older worker contexts would take precedence.
ctx = socket.new_context()
data = await ctx.arecv()
client_id = data[0]
if client_id in self._client_tasks:
logging.error(f"<Server>: We already have a task for client {client_id}")
continue # just let the client block on its 'recv' for now
logging.debug(f"<Server>: New client : {client_id}")
self._client_tasks[client_id] = asyncio.create_task(self._worker(ctx, client_id))
await asyncio.gather(*list(self._client_tasks.values()))
def run(self) -> None:
# The "server" thread has its own asyncio loop
asyncio.run(self._new_client_handler(), debug=True)
class Client(threading.Thread):
def __init__(self, client_id: int):
super(Client, self).__init__()
self._id = client_id
def __repr__(self):
return f'<Client {self._id}>'
def run(self):
with pynng.Req0(dial=ENDPOINT, resend_time=NNG_DURATION_INFINITE) as socket:
while True:
logging.debug(f"{self}: READY")
socket.send(bytes([self._id]))
data_str = socket.recv().decode()
logging.debug(f"{self}: received '{data_str}'")
if data_str != f"result data for client {self._id}":
logging.error(f"{self}: client/worker mismatch, exiting.")
return
def main():
logging.basicConfig(level=logging.DEBUG)
threads = [Server(),
*[Client(i) for i in range(3)]]
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()
输出:
DEBUG:asyncio:Using proactor: IocpProactor
DEBUG:root:<Client 1>: READY
DEBUG:root:<Client 0>: READY
DEBUG:root:<Client 2>: READY
DEBUG:root:<Server>: New client : 1
DEBUG:root:<Worker 1>: doing some IO
DEBUG:root:<Server>: New client : 0
DEBUG:root:<Worker 0>: doing some IO
DEBUG:root:<Server>: New client : 2
DEBUG:root:<Worker 2>: doing some IO
DEBUG:root:<Worker 1>: sending the result
DEBUG:root:<Client 1>: received 'result data for client 1'
DEBUG:root:<Client 1>: READY
ERROR:root:<Server>: We already have a task for client 1
DEBUG:root:<Worker 1>: waiting for client READY msg
DEBUG:root:<Worker 0>: sending the result
DEBUG:root:<Client 0>: received 'result data for client 0'
DEBUG:root:<Client 0>: READY
DEBUG:root:<Worker 0>: waiting for client READY msg
DEBUG:root:<Worker 1>: received 'b'\x00''
ERROR:root:<Worker 1>: worker/client mismatch, exiting.
DEBUG:root:<Worker 2>: sending the result
DEBUG:root:<Client 2>: received 'result data for client 2'
DEBUG:root:<Client 2>: READY
DEBUG:root:<Worker 2>: waiting for client READY msg
ERROR:root:<Server>: We already have a task for client 2
编辑 (2020-04-10): 将 pynng 和底层 nng.lib 更新到最新版本(master 分支),仍然是同样的问题。
在深入了解 nng 和 pynng 的源代码并与维护者确认我的理解后,我现在可以回答我自己的问题了。
在 REP0 套接字上使用上下文时,需要注意一些事项。
正如所宣传的那样,send/asend() 保证路由到您上次接收的同一个对等点。
然而,在同一上下文中来自下一个 recv/arecv() 的数据不能保证来自同一对等方。
实际上,对 rep0_ctx_recv()
的底层 nng 调用仅读取具有可用数据的下一个套接字管道,因此不能保证所述数据来自与最后一个 recv/send 对相同的对等点。
在上面的复制器中,我在新上下文(在 Server._new_client_handler()
协程中)和每个工作上下文(在Server._worker()
协程)。
所以我之前描述的主协程的下一个请求 "intercepted" 只是一个竞争条件。
一种解决方案是只从 Server._new_client_handler()
协同程序接收,并且让工作人员只处理一个请求。请注意,在这种情况下,工作人员不再专用于特定的对等方。如果需要此行为,传入请求的路由必须在应用程序级别处理。
class Server(threading.Thread):
@staticmethod
async def _worker(ctx, data: bytes):
client_id = int.from_bytes(data, byteorder='big', signed=False)
logging.debug(f"<Worker {client_id}>: doing some IO")
await asyncio.sleep(1 + 10 * random.random())
logging.debug(f"<Worker {client_id}>: sending the result")
await ctx.asend(f"result data for client {client_id}".encode())
async def _new_client_handler(self):
with pynng.Rep0(listen=ENDPOINT) as socket:
while await asyncio.sleep(0, result=True):
ctx = socket.new_context()
data = await ctx.arecv()
asyncio.create_task(self._worker(ctx, data))
def run(self) -> None:
# The "server" thread has its own asyncio loop
asyncio.run(self._new_client_handler(), debug=False)
我正在开发一个 "server" 线程,它负责一些 "clients".
的 IO 调用通信是使用 pynng v0.5.0, the server has its own asyncio 循环完成的。
每个客户端 "registers" 通过发送第一个请求,然后循环接收结果并发回 READY 消息。
在服务器上,目标是将每个客户端的第一条消息视为注册请求,并创建一个专用的工作任务,该任务将循环执行 IO 操作、发送结果并等待该客户端的 READY 消息特定客户。
为了实现这一点,我尝试利用 REP0 套接字的 Context 特性。
旁注
虽然我是这个网站的忠实用户,但这是我的第一个问题:)
我确实知道 PUB/SUB 模式,只是为了自学目的,我选择不将它用于此服务。
问题:
经过几次迭代后,一些 READY 消息被服务器的注册协程拦截,而不是被路由到适当的工作任务。
由于我无法分享代码,我为我的问题写了一个复制器并将其包含在下面。
更糟糕的是,正如您在输出中看到的那样,一些结果消息被发送到错误的客户端 (ERROR:root:<Worker 1>: worker/client mismatch, exiting.
)。
它看起来像一个错误,但我不完全确定我是否理解如何正确使用上下文,因此我们将不胜感激。
环境:
- winpython-3.8.2
- pynng v0.5.0+dev (46fbbcb2), with nng v1.3.0 (ff99ee51)
代码:
import asyncio
import logging
import pynng
import threading
NNG_DURATION_INFINITE = -1
ENDPOINT = 'inproc://example_endpoint'
class Server(threading.Thread):
def __init__(self):
super(Server, self).__init__()
self._client_tasks = dict()
@staticmethod
async def _worker(ctx, client_id):
while True:
# Remember, the first 'receive' has already been done by self._new_client_handler()
logging.debug(f"<Worker {client_id}>: doing some IO")
await asyncio.sleep(1)
logging.debug(f"<Worker {client_id}>: sending the result")
# I already tried sending synchronously here instead, just in case the issue was related to that
# (but it's not)
await ctx.asend(f"result data for client {client_id}".encode())
logging.debug(f"<Worker {client_id}>: waiting for client READY msg")
data = await ctx.arecv()
logging.debug(f"<Worker {client_id}>: received '{data}'")
if data != bytes([client_id]):
logging.error(f"<Worker {client_id}>: worker/client mismatch, exiting.")
return
async def _new_client_handler(self):
with pynng.Rep0(listen=ENDPOINT) as socket:
max_workers = 3 + 1 # Try setting it to 3 instead, to stop creating new contexts => now it works fine
while await asyncio.sleep(0, result=True) and len(self._client_tasks) < max_workers:
# The issue is here: at some point, the existing client READY messages get
# intercepted here, instead of being routed to the proper worker context.
# The intent here was to open a new context only for each *new* client, I was
# assuming that a 'recv' on older worker contexts would take precedence.
ctx = socket.new_context()
data = await ctx.arecv()
client_id = data[0]
if client_id in self._client_tasks:
logging.error(f"<Server>: We already have a task for client {client_id}")
continue # just let the client block on its 'recv' for now
logging.debug(f"<Server>: New client : {client_id}")
self._client_tasks[client_id] = asyncio.create_task(self._worker(ctx, client_id))
await asyncio.gather(*list(self._client_tasks.values()))
def run(self) -> None:
# The "server" thread has its own asyncio loop
asyncio.run(self._new_client_handler(), debug=True)
class Client(threading.Thread):
def __init__(self, client_id: int):
super(Client, self).__init__()
self._id = client_id
def __repr__(self):
return f'<Client {self._id}>'
def run(self):
with pynng.Req0(dial=ENDPOINT, resend_time=NNG_DURATION_INFINITE) as socket:
while True:
logging.debug(f"{self}: READY")
socket.send(bytes([self._id]))
data_str = socket.recv().decode()
logging.debug(f"{self}: received '{data_str}'")
if data_str != f"result data for client {self._id}":
logging.error(f"{self}: client/worker mismatch, exiting.")
return
def main():
logging.basicConfig(level=logging.DEBUG)
threads = [Server(),
*[Client(i) for i in range(3)]]
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()
输出:
DEBUG:asyncio:Using proactor: IocpProactor
DEBUG:root:<Client 1>: READY
DEBUG:root:<Client 0>: READY
DEBUG:root:<Client 2>: READY
DEBUG:root:<Server>: New client : 1
DEBUG:root:<Worker 1>: doing some IO
DEBUG:root:<Server>: New client : 0
DEBUG:root:<Worker 0>: doing some IO
DEBUG:root:<Server>: New client : 2
DEBUG:root:<Worker 2>: doing some IO
DEBUG:root:<Worker 1>: sending the result
DEBUG:root:<Client 1>: received 'result data for client 1'
DEBUG:root:<Client 1>: READY
ERROR:root:<Server>: We already have a task for client 1
DEBUG:root:<Worker 1>: waiting for client READY msg
DEBUG:root:<Worker 0>: sending the result
DEBUG:root:<Client 0>: received 'result data for client 0'
DEBUG:root:<Client 0>: READY
DEBUG:root:<Worker 0>: waiting for client READY msg
DEBUG:root:<Worker 1>: received 'b'\x00''
ERROR:root:<Worker 1>: worker/client mismatch, exiting.
DEBUG:root:<Worker 2>: sending the result
DEBUG:root:<Client 2>: received 'result data for client 2'
DEBUG:root:<Client 2>: READY
DEBUG:root:<Worker 2>: waiting for client READY msg
ERROR:root:<Server>: We already have a task for client 2
编辑 (2020-04-10): 将 pynng 和底层 nng.lib 更新到最新版本(master 分支),仍然是同样的问题。
在深入了解 nng 和 pynng 的源代码并与维护者确认我的理解后,我现在可以回答我自己的问题了。
在 REP0 套接字上使用上下文时,需要注意一些事项。
正如所宣传的那样,send/asend() 保证路由到您上次接收的同一个对等点。
然而,在同一上下文中来自下一个 recv/arecv() 的数据不能保证来自同一对等方。
实际上,对 rep0_ctx_recv()
的底层 nng 调用仅读取具有可用数据的下一个套接字管道,因此不能保证所述数据来自与最后一个 recv/send 对相同的对等点。
在上面的复制器中,我在新上下文(在 Server._new_client_handler()
协程中)和每个工作上下文(在Server._worker()
协程)。
所以我之前描述的主协程的下一个请求 "intercepted" 只是一个竞争条件。
一种解决方案是只从 Server._new_client_handler()
协同程序接收,并且让工作人员只处理一个请求。请注意,在这种情况下,工作人员不再专用于特定的对等方。如果需要此行为,传入请求的路由必须在应用程序级别处理。
class Server(threading.Thread):
@staticmethod
async def _worker(ctx, data: bytes):
client_id = int.from_bytes(data, byteorder='big', signed=False)
logging.debug(f"<Worker {client_id}>: doing some IO")
await asyncio.sleep(1 + 10 * random.random())
logging.debug(f"<Worker {client_id}>: sending the result")
await ctx.asend(f"result data for client {client_id}".encode())
async def _new_client_handler(self):
with pynng.Rep0(listen=ENDPOINT) as socket:
while await asyncio.sleep(0, result=True):
ctx = socket.new_context()
data = await ctx.arecv()
asyncio.create_task(self._worker(ctx, data))
def run(self) -> None:
# The "server" thread has its own asyncio loop
asyncio.run(self._new_client_handler(), debug=False)