通过 websocket 从 Starlette 中的同步迭代器发送数据
Send data via websocket from synchronous iterator in Starlette
我有一个来自第三方包的同步迭代器。迭代器查询外部服务并产生一些数据。如果没有数据,迭代器将等待它。我将 Starlette 的 WebSocketEndpoint
子类化,以通过 websocket 从迭代器发送新数据。不幸的是,似乎我不明白什么,我的代码没有按预期工作。这是一个稍微简化的代码:
import time
from starlette.endpoints import WebSocketEndpoint
from starlette.websockets import WebSocket
class Iterator:
"""This is a third-party object, not asynchronous at all."""
def __init__(self):
self._stop = False
def __iter__(self):
self.i = 0
return self
def __next__(self):
if self._stop:
raise StopIteration
time.sleep(5)
self.i += 1
print(self.i)
return self.i
def cancel(self):
self._stop = True
class MyWebSocket(WebSocketEndpoint):
def __init__(self, scope, receive, send) -> None:
super().__init__(scope, receive, send)
self.iterator = Iterator()
async def on_connect(self, websocket: WebSocket) -> None:
await super().on_connect(websocket)
for message in self.iterator:
await websocket.send_json({"message": message})
async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
await super().on_disconnect(websocket, close_code)
self.iterator.cancel()
第一个问题 - 代码不通过 websocket 发送任何数据。 print 语句表明,迭代器生成数据,但实际上没有发送任何数据。如果我将 return
放在 websocket.send_json()
之后,它将正确发送迭代器的第一个结果,但循环将在之后完成。为什么?
另一个问题是迭代器完全阻塞了应用程序的执行。我明白为什么会发生这种情况,但由于它是一个 Web 服务,并且迭代器旨在工作直到客户端与 websocket 断开连接,它可以很容易地阻止我的整个应用程序。如果我有 10 个工作人员,则 10 个 websocket 客户端将阻止该应用程序,并且在其中一个 websocket 断开连接之前无法执行任何操作。我该如何解决?
This is a third-party object, not asynchronous at all.
问题就在这里——asyncio 是单线程的,所以你的迭代器要么根本不阻塞(就像迭代内置集合时一样),要么你必须使用 async iterator 和async for
循环将在等待下一项时暂停执行。
在处理第三方阻塞函数时,您可以使用run_in_executor
将其合并到异步代码中,这会将函数提交到线程池并暂停当前协程,直到函数完成。您不能将迭代器直接传递给 run_in_executor
,但您可以创建一个包装器,它采用同步迭代器并通过 run_in_executor
运行每个单独的 __next__
调用,提供异步迭代器的接口。例如:
async def wrap_iter(iterable):
loop = asyncio.get_event_loop()
it = iter(iterable)
DONE = object()
def get_next_item():
# Get the next item synchronously. We cannot call next(it)
# directly because StopIteration cannot be transferred
# across an "await". So we detect StopIteration and
# convert it to a sentinel object.
try:
return next(it)
except StopIteration:
return DONE
while True:
# Submit execution of next(it) to another thread and resume
# when it's done. await will suspend the coroutine and
# allow other tasks to execute while waiting.
next_item = await loop.run_in_executor(None, get_next_item)
if next_item is DONE:
break
yield next_item
现在您可以将 for message in self.iterator
替换为 async for message in wrap_iter(self.iterator)
,一切正常。
我有一个来自第三方包的同步迭代器。迭代器查询外部服务并产生一些数据。如果没有数据,迭代器将等待它。我将 Starlette 的 WebSocketEndpoint
子类化,以通过 websocket 从迭代器发送新数据。不幸的是,似乎我不明白什么,我的代码没有按预期工作。这是一个稍微简化的代码:
import time
from starlette.endpoints import WebSocketEndpoint
from starlette.websockets import WebSocket
class Iterator:
"""This is a third-party object, not asynchronous at all."""
def __init__(self):
self._stop = False
def __iter__(self):
self.i = 0
return self
def __next__(self):
if self._stop:
raise StopIteration
time.sleep(5)
self.i += 1
print(self.i)
return self.i
def cancel(self):
self._stop = True
class MyWebSocket(WebSocketEndpoint):
def __init__(self, scope, receive, send) -> None:
super().__init__(scope, receive, send)
self.iterator = Iterator()
async def on_connect(self, websocket: WebSocket) -> None:
await super().on_connect(websocket)
for message in self.iterator:
await websocket.send_json({"message": message})
async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
await super().on_disconnect(websocket, close_code)
self.iterator.cancel()
第一个问题 - 代码不通过 websocket 发送任何数据。 print 语句表明,迭代器生成数据,但实际上没有发送任何数据。如果我将 return
放在 websocket.send_json()
之后,它将正确发送迭代器的第一个结果,但循环将在之后完成。为什么?
另一个问题是迭代器完全阻塞了应用程序的执行。我明白为什么会发生这种情况,但由于它是一个 Web 服务,并且迭代器旨在工作直到客户端与 websocket 断开连接,它可以很容易地阻止我的整个应用程序。如果我有 10 个工作人员,则 10 个 websocket 客户端将阻止该应用程序,并且在其中一个 websocket 断开连接之前无法执行任何操作。我该如何解决?
This is a third-party object, not asynchronous at all.
问题就在这里——asyncio 是单线程的,所以你的迭代器要么根本不阻塞(就像迭代内置集合时一样),要么你必须使用 async iterator 和async for
循环将在等待下一项时暂停执行。
在处理第三方阻塞函数时,您可以使用run_in_executor
将其合并到异步代码中,这会将函数提交到线程池并暂停当前协程,直到函数完成。您不能将迭代器直接传递给 run_in_executor
,但您可以创建一个包装器,它采用同步迭代器并通过 run_in_executor
运行每个单独的 __next__
调用,提供异步迭代器的接口。例如:
async def wrap_iter(iterable):
loop = asyncio.get_event_loop()
it = iter(iterable)
DONE = object()
def get_next_item():
# Get the next item synchronously. We cannot call next(it)
# directly because StopIteration cannot be transferred
# across an "await". So we detect StopIteration and
# convert it to a sentinel object.
try:
return next(it)
except StopIteration:
return DONE
while True:
# Submit execution of next(it) to another thread and resume
# when it's done. await will suspend the coroutine and
# allow other tasks to execute while waiting.
next_item = await loop.run_in_executor(None, get_next_item)
if next_item is DONE:
break
yield next_item
现在您可以将 for message in self.iterator
替换为 async for message in wrap_iter(self.iterator)
,一切正常。