asyncio run_until_complete 不等待所有协程完成

asyncio run_until_complete does not wait that all coroutines finish

我正在 Python 迈出我的第一步,我在试图理解为什么我没有得到预期的结果时遇到了一些困难。这是我想要实现的目标:

我有一个消耗 API 的函数。在等待 API 回答时,考虑到我正在通过一个会产生额外延迟的代理,我认为发送并发请求会加快进程(我 运行 100 个并发请求)。确实如此。但是 asyncio run_until_complete 总是 returns 一些未完成的协程。

这里是代码(简化版):

import aiohttp
import asyncio
    
async def consume_api(parameter):
    url = "someurl" #it is actually based on the parameter
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(URL, proxy="someproxy") as asyncresponse:
                r = await asyncresponse.read()
    except:
        global error_count 
        error_count += 1
        if error_count > 50:
            return "Exceeded 50 try on same request"
        else:
            return consume_api(parameter)
    return r.decode("utf-8") 

def loop_on_api(list_of_parameter):
    loop = asyncio.get_event_loop()

    coroutines = [consume_api(list_of_parameter[i]) for i in range(len(list_of_parameter))]
    results = loop.run_until_complete(asyncio.gather(*coroutines))
    return results

当我 运行 调试器时,loop_on_api 函数返回的 results 包含一个字符串列表,对应于 consume_api 的结果和一些出现的 <coroutine objects consume_api at 0x00...>。这些变量有一个 cr_running 参数为 False 和一个 cr_Frame。 虽然如果我检查 coroutines 变量,我可以找到所有 100 个协程,但 none 似乎有一个 cr_Frame.

知道我做错了什么吗?

我也在考虑我计算 50 错误 的方法将由所有协程共享。

知道如何具体化吗?

问题似乎出在我使用的代理上,它有时不携带请求或响应。因此强制 re运行 似乎是最好的答案。因此,我现在检查返回的结果是否还有一些协程,并重新运行它们的loop_on_api()

def loop_on_api(list_of_parameter):
    loop = asyncio.get_event_loop()

    coroutines = [consume_api(list_of_parameter[i]) for i in range(len(list_of_parameter))]
    results = loop.run_until_complete(asyncio.gather(*coroutines))

    undone = []
    rerun_list_of_parameter = []
    
    for i in range(len(results)):
        if str(type(results[i])) == "<class 'coroutine'>": #not very elegant >> is there a better way?
            undone.append(i)
            rerun_list_of_parameter.append(list_of_parameter[i])

    if len(undone) > 0:
        undone_results = loop_on_api(rerun_list_of_parameter)
        for i in range(len(undone_results)):
            results[undone[i]] = undone_results[i]

    return results

这应该有用,你可以add/change/refactor任何你想要的

import aiohttp
import asyncio


async def consume_api(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.read()


def loop_on_api(list_of_urls):
    loop = asyncio.get_event_loop()

    coroutines = [consume_api(url) for url in list_of_urls]

    results = loop.run_until_complete(asyncio.gather(*coroutines))
    return results


if __name__ == '__main__':
    print(loop_on_api(['https://google.com', 'https://twitter.com']))