asyncio run_until_complete 不等待所有协程完成
asyncio run_until_complete does not wait that all coroutines finish
我正在 Python 迈出我的第一步,我在试图理解为什么我没有得到预期的结果时遇到了一些困难。这是我想要实现的目标:
我有一个消耗 API 的函数。在等待 API 回答时,考虑到我正在通过一个会产生额外延迟的代理,我认为发送并发请求会加快进程(我 运行 100 个并发请求)。确实如此。但是 asyncio run_until_complete
总是 returns 一些未完成的协程。
这里是代码(简化版):
import aiohttp
import asyncio
async def consume_api(parameter):
url = "someurl" #it is actually based on the parameter
try:
async with aiohttp.ClientSession() as session:
async with session.get(URL, proxy="someproxy") as asyncresponse:
r = await asyncresponse.read()
except:
global error_count
error_count += 1
if error_count > 50:
return "Exceeded 50 try on same request"
else:
return consume_api(parameter)
return r.decode("utf-8")
def loop_on_api(list_of_parameter):
loop = asyncio.get_event_loop()
coroutines = [consume_api(list_of_parameter[i]) for i in range(len(list_of_parameter))]
results = loop.run_until_complete(asyncio.gather(*coroutines))
return results
当我 运行 调试器时,loop_on_api
函数返回的 results
包含一个字符串列表,对应于 consume_api
的结果和一些出现的 <coroutine objects consume_api at 0x00...>
。这些变量有一个 cr_running 参数为 False 和一个 cr_Frame。
虽然如果我检查 coroutines
变量,我可以找到所有 100 个协程,但 none 似乎有一个 cr_Frame.
知道我做错了什么吗?
我也在考虑我计算 50 错误 的方法将由所有协程共享。
知道如何具体化吗?
问题似乎出在我使用的代理上,它有时不携带请求或响应。因此强制 re运行 似乎是最好的答案。因此,我现在检查返回的结果是否还有一些协程,并重新运行它们的loop_on_api()
def loop_on_api(list_of_parameter):
loop = asyncio.get_event_loop()
coroutines = [consume_api(list_of_parameter[i]) for i in range(len(list_of_parameter))]
results = loop.run_until_complete(asyncio.gather(*coroutines))
undone = []
rerun_list_of_parameter = []
for i in range(len(results)):
if str(type(results[i])) == "<class 'coroutine'>": #not very elegant >> is there a better way?
undone.append(i)
rerun_list_of_parameter.append(list_of_parameter[i])
if len(undone) > 0:
undone_results = loop_on_api(rerun_list_of_parameter)
for i in range(len(undone_results)):
results[undone[i]] = undone_results[i]
return results
这应该有用,你可以add/change/refactor任何你想要的
import aiohttp
import asyncio
async def consume_api(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.read()
def loop_on_api(list_of_urls):
loop = asyncio.get_event_loop()
coroutines = [consume_api(url) for url in list_of_urls]
results = loop.run_until_complete(asyncio.gather(*coroutines))
return results
if __name__ == '__main__':
print(loop_on_api(['https://google.com', 'https://twitter.com']))
我正在 Python 迈出我的第一步,我在试图理解为什么我没有得到预期的结果时遇到了一些困难。这是我想要实现的目标:
我有一个消耗 API 的函数。在等待 API 回答时,考虑到我正在通过一个会产生额外延迟的代理,我认为发送并发请求会加快进程(我 运行 100 个并发请求)。确实如此。但是 asyncio run_until_complete
总是 returns 一些未完成的协程。
这里是代码(简化版):
import aiohttp
import asyncio
async def consume_api(parameter):
url = "someurl" #it is actually based on the parameter
try:
async with aiohttp.ClientSession() as session:
async with session.get(URL, proxy="someproxy") as asyncresponse:
r = await asyncresponse.read()
except:
global error_count
error_count += 1
if error_count > 50:
return "Exceeded 50 try on same request"
else:
return consume_api(parameter)
return r.decode("utf-8")
def loop_on_api(list_of_parameter):
loop = asyncio.get_event_loop()
coroutines = [consume_api(list_of_parameter[i]) for i in range(len(list_of_parameter))]
results = loop.run_until_complete(asyncio.gather(*coroutines))
return results
当我 运行 调试器时,loop_on_api
函数返回的 results
包含一个字符串列表,对应于 consume_api
的结果和一些出现的 <coroutine objects consume_api at 0x00...>
。这些变量有一个 cr_running 参数为 False 和一个 cr_Frame。
虽然如果我检查 coroutines
变量,我可以找到所有 100 个协程,但 none 似乎有一个 cr_Frame.
知道我做错了什么吗?
我也在考虑我计算 50 错误 的方法将由所有协程共享。
知道如何具体化吗?
问题似乎出在我使用的代理上,它有时不携带请求或响应。因此强制 re运行 似乎是最好的答案。因此,我现在检查返回的结果是否还有一些协程,并重新运行它们的loop_on_api()
def loop_on_api(list_of_parameter):
loop = asyncio.get_event_loop()
coroutines = [consume_api(list_of_parameter[i]) for i in range(len(list_of_parameter))]
results = loop.run_until_complete(asyncio.gather(*coroutines))
undone = []
rerun_list_of_parameter = []
for i in range(len(results)):
if str(type(results[i])) == "<class 'coroutine'>": #not very elegant >> is there a better way?
undone.append(i)
rerun_list_of_parameter.append(list_of_parameter[i])
if len(undone) > 0:
undone_results = loop_on_api(rerun_list_of_parameter)
for i in range(len(undone_results)):
results[undone[i]] = undone_results[i]
return results
这应该有用,你可以add/change/refactor任何你想要的
import aiohttp
import asyncio
async def consume_api(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.read()
def loop_on_api(list_of_urls):
loop = asyncio.get_event_loop()
coroutines = [consume_api(url) for url in list_of_urls]
results = loop.run_until_complete(asyncio.gather(*coroutines))
return results
if __name__ == '__main__':
print(loop_on_api(['https://google.com', 'https://twitter.com']))