在一些协程完成后添加一个协程,在一个已经 运行 的事件循环中(相同的循环,相同的线程)

Add a coroutine after some coroutines have finished, in an already running event loop (same loop, same thread)

我已经阅读了一些关于 SO 的类似问题,为了将协程添加到已经 运行 的事件循环中,一些答案是针对所问的问题量身定制的,因此并不完全适用于我的情况;最常见的是 asyncio.ensure_future(coro(), loop=my_running_loop),或者对于线程安全版本,它将是 asyncio.run_coroutine_threadsafe(coro(), my_running_loop);最后的手段是在当前循环结束后产生另一个循环。

我先提出我的问题:

  1. 为什么前两种方法不起作用?
  2. 除了第三种方法,是否还有另一种方法,最好在 class 中完成,这样我就不必在 class 固有的循环完成后开始另一个循环?

我的场景的一个简化示例是 REST-API 的另一端有 1000 个项目,我必须将它们全部拉出来进行一些分析。但是 REST-API 取决于网络连接,因此间歇性地我可能会有一些项目只是超时错误(或空);在这种情况下,我必须再次调用 REST-API 来获取那些失败的。

只是为了演示我的场景,没有所有粒度(特定于数据而不是代码逻辑):

import asyncio
import ItemType  # Enum
import async_rest_api
import pandas as pd

class DataSource:
    def __init__(self):
        # some settings on self
        # particularly:
        # self.loop = asyncio.get_event_loop()
        # self.item_names_to_process = some_df  # pd.DataFrame of one column named 'name'
        # self.unprocessed_item_names = []
        # self.result = pd.DataFrame()
        # super().__init__()

    def get_item_fetching_function(item_type: ItemType):
        if item_type == ItemType.One:
           return async_rest_api.get_item_type_one
        if item_type == ItemType.Two:
           return async_rest_api.get_item_type_two
        if item_type == ItemType.Three:
           return async_rest_api.get_item_type_three

    # args were of length greater than one, 
    # here I simplified it to just item_type and item_names.    
    async def get_items(self, item_type, item_names):  
        step_size = 500
        results = []

        for i in range(0, len(item_names), step_size):
            tasks = []
            for name in item_names[i:i+step_size]:
                tasks.append(self.get_item_fetching_function(item_type)(name))
            results.extend(await asyncio.gather(*tasks, return_exception=True))


        result = pd.concat([r[1] for r in results])
        processed_items_names = pd.DataFrame(results.name.unique(), columns=['name'])
        unprocessed = self.item_names_to_process.append(processed_items_names).drop_duplicates(keep=False)
        self.unprocessed = unprocessed.name.tolist()
        self.result = pd.concat([self.result, result])
        
        # Trial: I wrote another line here to add 
        # a coroutine to process the unprocessed items. 
        # I tried both in-thread and cross-thread, 
        # neither have worked and no error was thrown; 
        # the code just finished silently.

        # in the running thread

        asyncio.ensure_future(self.get_items(item_type, self.unprocessed), loop=self.loop)

        # or in another thread
   
        asyncio.run_coroutine_threadsafe(self.get_items(item_type, self.unprocessed), self.loop)


    def wrapper_function(self, item_type):
        self.loop.run_until_complete(self.get_items(item_type, self.item_names_to_process))
    
    if __names__ == "__main__":
         import DataSource
         import ItemType
         import asyncio
         data_source = DataSource()
         names = ''.join(random.choices(string.ascii_uppercase + string.digits, k=1000))
         data_source.wrapper_function(ItemType.One)
         # Trial: the last resort would be to check on the `self.unprocessed` here, if it has values, I'll re-run `self.get_items`. This method worked, as expected.
         if len(data_source.unprocessed):
            asyncio.run(data_source.get_items(ItemType.One, data_source.unprocessed))
         

只需将 asyncio.gather 替换为您可以更好控制的结构。

是这样的:当你调用一个协程函数时,你会得到一个协程对象。这个对象可以

  • (1) 直接等待:您的代码暂停并以线性方式等待其完成:因此这不适合并行启动任务。
  • (2) 被提升为“任务”,它们,每当您的代码屈服于 运行 循环时,它将逐步执行所有现有任务,然后再回到您的代码。

asyncio.gather 的事情是它做了两件事:将所有不是任务的对象提升为任务,然后 等待 让它们全部完成完成,等待 gather 调用本身。

您可以保持相同的设计,只需将行 results.extend(await asyncio.gather(*tasks, return_exception=True)) 移动到 i 的循环内(并在 name 的循环下方):等待一整批“step_size”大小的任务,直到它们结束 - 或者,您可以只显式创建任务,并使用 asyncio.wait (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) 在解析器中检索结果,控制在添加下一批任务之前暂停的时间。 (这需要稍微巧妙地重新设计 - 只需为每个 setp_size 批次调用 gather 会容易得多)