在一些协程完成后添加一个协程,在一个已经 运行 的事件循环中(相同的循环,相同的线程)
Add a coroutine after some coroutines have finished, in an already running event loop (same loop, same thread)
我已经阅读了一些关于 SO 的类似问题,为了将协程添加到已经 运行 的事件循环中,一些答案是针对所问的问题量身定制的,因此并不完全适用于我的情况;最常见的是 asyncio.ensure_future(coro(), loop=my_running_loop)
,或者对于线程安全版本,它将是 asyncio.run_coroutine_threadsafe(coro(), my_running_loop)
;最后的手段是在当前循环结束后产生另一个循环。
我先提出我的问题:
- 为什么前两种方法不起作用?
- 除了第三种方法,是否还有另一种方法,最好在 class 中完成,这样我就不必在 class 固有的循环完成后开始另一个循环?
我的场景的一个简化示例是 REST-API 的另一端有 1000 个项目,我必须将它们全部拉出来进行一些分析。但是 REST-API 取决于网络连接,因此间歇性地我可能会有一些项目只是超时错误(或空);在这种情况下,我必须再次调用 REST-API 来获取那些失败的。
只是为了演示我的场景,没有所有粒度(特定于数据而不是代码逻辑):
import asyncio
import ItemType # Enum
import async_rest_api
import pandas as pd
class DataSource:
def __init__(self):
# some settings on self
# particularly:
# self.loop = asyncio.get_event_loop()
# self.item_names_to_process = some_df # pd.DataFrame of one column named 'name'
# self.unprocessed_item_names = []
# self.result = pd.DataFrame()
# super().__init__()
def get_item_fetching_function(item_type: ItemType):
if item_type == ItemType.One:
return async_rest_api.get_item_type_one
if item_type == ItemType.Two:
return async_rest_api.get_item_type_two
if item_type == ItemType.Three:
return async_rest_api.get_item_type_three
# args were of length greater than one,
# here I simplified it to just item_type and item_names.
async def get_items(self, item_type, item_names):
step_size = 500
results = []
for i in range(0, len(item_names), step_size):
tasks = []
for name in item_names[i:i+step_size]:
tasks.append(self.get_item_fetching_function(item_type)(name))
results.extend(await asyncio.gather(*tasks, return_exception=True))
result = pd.concat([r[1] for r in results])
processed_items_names = pd.DataFrame(results.name.unique(), columns=['name'])
unprocessed = self.item_names_to_process.append(processed_items_names).drop_duplicates(keep=False)
self.unprocessed = unprocessed.name.tolist()
self.result = pd.concat([self.result, result])
# Trial: I wrote another line here to add
# a coroutine to process the unprocessed items.
# I tried both in-thread and cross-thread,
# neither have worked and no error was thrown;
# the code just finished silently.
# in the running thread
asyncio.ensure_future(self.get_items(item_type, self.unprocessed), loop=self.loop)
# or in another thread
asyncio.run_coroutine_threadsafe(self.get_items(item_type, self.unprocessed), self.loop)
def wrapper_function(self, item_type):
self.loop.run_until_complete(self.get_items(item_type, self.item_names_to_process))
if __names__ == "__main__":
import DataSource
import ItemType
import asyncio
data_source = DataSource()
names = ''.join(random.choices(string.ascii_uppercase + string.digits, k=1000))
data_source.wrapper_function(ItemType.One)
# Trial: the last resort would be to check on the `self.unprocessed` here, if it has values, I'll re-run `self.get_items`. This method worked, as expected.
if len(data_source.unprocessed):
asyncio.run(data_source.get_items(ItemType.One, data_source.unprocessed))
只需将 asyncio.gather
替换为您可以更好控制的结构。
是这样的:当你调用一个协程函数时,你会得到一个协程对象。这个对象可以
- (1) 直接等待:您的代码暂停并以线性方式等待其完成:因此这不适合并行启动任务。
- (2) 被提升为“任务”,它们,每当您的代码屈服于 运行 循环时,它将逐步执行所有现有任务,然后再回到您的代码。
asyncio.gather
的事情是它做了两件事:将所有不是任务的对象提升为任务,然后 等待 让它们全部完成完成,等待 gather
调用本身。
您可以保持相同的设计,只需将行 results.extend(await asyncio.gather(*tasks, return_exception=True))
移动到 i
的循环内(并在 name
的循环下方):等待一整批“step_size”大小的任务,直到它们结束 -
或者,您可以只显式创建任务,并使用 asyncio.wait
(https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) 在解析器中检索结果,控制在添加下一批任务之前暂停的时间。 (这需要稍微巧妙地重新设计 - 只需为每个 setp_size 批次调用 gather
会容易得多)
我已经阅读了一些关于 SO 的类似问题,为了将协程添加到已经 运行 的事件循环中,一些答案是针对所问的问题量身定制的,因此并不完全适用于我的情况;最常见的是 asyncio.ensure_future(coro(), loop=my_running_loop)
,或者对于线程安全版本,它将是 asyncio.run_coroutine_threadsafe(coro(), my_running_loop)
;最后的手段是在当前循环结束后产生另一个循环。
我先提出我的问题:
- 为什么前两种方法不起作用?
- 除了第三种方法,是否还有另一种方法,最好在 class 中完成,这样我就不必在 class 固有的循环完成后开始另一个循环?
我的场景的一个简化示例是 REST-API 的另一端有 1000 个项目,我必须将它们全部拉出来进行一些分析。但是 REST-API 取决于网络连接,因此间歇性地我可能会有一些项目只是超时错误(或空);在这种情况下,我必须再次调用 REST-API 来获取那些失败的。
只是为了演示我的场景,没有所有粒度(特定于数据而不是代码逻辑):
import asyncio
import ItemType # Enum
import async_rest_api
import pandas as pd
class DataSource:
def __init__(self):
# some settings on self
# particularly:
# self.loop = asyncio.get_event_loop()
# self.item_names_to_process = some_df # pd.DataFrame of one column named 'name'
# self.unprocessed_item_names = []
# self.result = pd.DataFrame()
# super().__init__()
def get_item_fetching_function(item_type: ItemType):
if item_type == ItemType.One:
return async_rest_api.get_item_type_one
if item_type == ItemType.Two:
return async_rest_api.get_item_type_two
if item_type == ItemType.Three:
return async_rest_api.get_item_type_three
# args were of length greater than one,
# here I simplified it to just item_type and item_names.
async def get_items(self, item_type, item_names):
step_size = 500
results = []
for i in range(0, len(item_names), step_size):
tasks = []
for name in item_names[i:i+step_size]:
tasks.append(self.get_item_fetching_function(item_type)(name))
results.extend(await asyncio.gather(*tasks, return_exception=True))
result = pd.concat([r[1] for r in results])
processed_items_names = pd.DataFrame(results.name.unique(), columns=['name'])
unprocessed = self.item_names_to_process.append(processed_items_names).drop_duplicates(keep=False)
self.unprocessed = unprocessed.name.tolist()
self.result = pd.concat([self.result, result])
# Trial: I wrote another line here to add
# a coroutine to process the unprocessed items.
# I tried both in-thread and cross-thread,
# neither have worked and no error was thrown;
# the code just finished silently.
# in the running thread
asyncio.ensure_future(self.get_items(item_type, self.unprocessed), loop=self.loop)
# or in another thread
asyncio.run_coroutine_threadsafe(self.get_items(item_type, self.unprocessed), self.loop)
def wrapper_function(self, item_type):
self.loop.run_until_complete(self.get_items(item_type, self.item_names_to_process))
if __names__ == "__main__":
import DataSource
import ItemType
import asyncio
data_source = DataSource()
names = ''.join(random.choices(string.ascii_uppercase + string.digits, k=1000))
data_source.wrapper_function(ItemType.One)
# Trial: the last resort would be to check on the `self.unprocessed` here, if it has values, I'll re-run `self.get_items`. This method worked, as expected.
if len(data_source.unprocessed):
asyncio.run(data_source.get_items(ItemType.One, data_source.unprocessed))
只需将 asyncio.gather
替换为您可以更好控制的结构。
是这样的:当你调用一个协程函数时,你会得到一个协程对象。这个对象可以
- (1) 直接等待:您的代码暂停并以线性方式等待其完成:因此这不适合并行启动任务。
- (2) 被提升为“任务”,它们,每当您的代码屈服于 运行 循环时,它将逐步执行所有现有任务,然后再回到您的代码。
asyncio.gather
的事情是它做了两件事:将所有不是任务的对象提升为任务,然后 等待 让它们全部完成完成,等待 gather
调用本身。
您可以保持相同的设计,只需将行 results.extend(await asyncio.gather(*tasks, return_exception=True))
移动到 i
的循环内(并在 name
的循环下方):等待一整批“step_size”大小的任务,直到它们结束 -
或者,您可以只显式创建任务,并使用 asyncio.wait
(https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) 在解析器中检索结果,控制在添加下一批任务之前暂停的时间。 (这需要稍微巧妙地重新设计 - 只需为每个 setp_size 批次调用 gather
会容易得多)