运行 asyncio run_until_complete() 语句完成后的代码
Run code after asyncio run_until_complete() statement has finished
我是 asyncio
的新手,我设法用它完成了一些请求。我创建了一个函数 fetch_all()
,它接收查询列表(URL)和之前使用 asyncio
创建的循环作为参数,并调用函数 fetch()
获取每个查询的结果JSON 格式:
import aiohttp
import asyncio
import ssl
import nest_asyncio
nest_asyncio.apply()
async def fetch(session, url):
async with session.get(url, ssl=ssl.SSLContext()) as response:
return await response.json()
async def fetch_all(urls, loop):
async with aiohttp.ClientSession(loop=loop) as session:
return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
results = loop.run_until_complete(fetch_all(queries, loop))
这工作正常,我在 results
中得到查询结果作为 JSON 列表(字典)。但是我的问题来了:有时,我得到一些结果错误(RuntimeError
、aiohttp.client_exceptions.ClientConnectorError
等),而不是 JSON。我想这些都是一次性错误,因为如果我单独重做查询,我会得到想要的结果。因此,我想出了一个 while
循环来检查哪些结果不是字典并重做它们的查询:我用查询及其索引初始化 repeat_queries
、error_index
和 results
,并应用 run_until_complete()
。然后我保存每个作为字典的结果并更新剩下的查询列表及其索引:
repeat_queries = queries
error_index = list(range(len(repeat_queries)))
results = error_index
while error_index:
if __name__ == '__main__':
loop = asyncio.get_event_loop()
repeat_results = loop.run_until_complete(fetch_all(repeat_queries, loop))
for i, rr in zip(error_index, repeat_results):
results[i] = rr
error_index = [i for i in range(len(results)) if not isinstance(results[i], dict)]
repeat_queries = [repeat_queries[i] for i in error_index]
但是,由于asyncio
循环是异步的,error_index
和repeat_queries
更新在run_until_complete()
完成之前执行,循环不断运行 查询已经在之前的迭代中执行,导致(几乎)无限 while
循环。
因此,我的问题是:
loop.run_until_complete()
完成后,有什么方法可以强制执行某些代码?
我在 Whosebug 中看到了一些类似的问题,但我无法应用他们的任何答案。
我会以不同的方式做到这一点。
我会 运行 使用 try/except
在 fetch()
中循环以捕获异常并重复它。
因为有些问题永远无法给出结果所以while
-循环可能运行永远-所以我宁愿使用for _ in range(3)
只尝试三次
我也会从 fetch
return url
这样获取不给出结果的 url 会更容易。
import aiohttp
import asyncio
import ssl
async def fetch(session, url):
exception = None
for number in range(3): # try only 3 times
try:
async with session.get(url, ssl=ssl.SSLContext()) as response:
data = await response.json()
#print('data:', data)
return url, data
except Exception as ex:
print('[ERROR] {} | {} | {}'.format(url, number+1, ex))
exception = ex
return url, exception
async def fetch_all(urls, loop):
async with aiohttp.ClientSession(loop=loop) as session:
return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
queries = [
'https://httpbin.org/get',
'https://toscrape.com',
'https://fake.domain/'
]
if __name__ == '__main__':
loop = asyncio.get_event_loop()
results = loop.run_until_complete(fetch_all(queries, loop))
#print(results)
print('--- results ---')
for url, result in results:
print('url:', url)
print('result:', result)
print('is dict:', isinstance(result, dict))
print('type:', type(result))
print('---')
结果:
[ERROR] https://fake.domain/ | 1 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc2c0> [Name or service not known]
[ERROR] https://fake.domain/ | 2 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc440> [Name or service not known]
[ERROR] https://fake.domain/ | 3 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc9c0> [Name or service not known]
[ERROR] https://toscrape.com | 1 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
[ERROR] https://toscrape.com | 2 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
[ERROR] https://toscrape.com | 3 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
--- results ---
url: https://httpbin.org/get
result: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.7.4.post0', 'X-Amzn-Trace-Id': 'Root=1-60e5c00e-45aae85e78277e5122b262c9'}, 'origin': '83.11.175.159', 'url': 'https://httpbin.org/get'}
is dict: True
type: <class 'dict'>
---
url: https://toscrape.com
result: 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
is dict: False
type: <class 'aiohttp.client_exceptions.ContentTypeError'>
---
url: https://fake.domain/
result: Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc9c0> [Name or service not known]
is dict: False
type: <class 'aiohttp.client_exceptions.ClientConnectorError'>
---
编辑:
使用你的循环方法的版本 run_until_complete
但我会在一个 for
循环中完成所有操作。
我会用 for _ in range(3)
只重复三遍。
这可行,但以前的版本似乎更简单。
import aiohttp
import asyncio
import ssl
async def fetch(session, url):
async with session.get(url, ssl=ssl.SSLContext()) as response:
return await response.json()
async def fetch_all(urls, loop):
async with aiohttp.ClientSession(loop=loop) as session:
return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
queries = [
'https://httpbin.org/get',
'https://httpbin.org/json',
'https://toscrape.com',
'https://fake.domain/'
]
if __name__ == '__main__':
# you can get it once
loop = asyncio.get_event_loop()
# original all queries
all_queries = queries
# places for all results
all_results = [None] * len(all_queries)
# selected indexes at start
indexes = list(range(len(all_queries)))
for number in range(3):
# selected queries
queries = [all_queries[idx] for idx in indexes]
# selected results
results = loop.run_until_complete(fetch_all(queries, loop))
print('\n--- try:', number+1, '--- results:', len(results), '---\n')
new_indexes = []
for idx, url, result in zip(indexes, queries, results):
all_results[idx] = result
if not isinstance(result, dict):
new_indexes.append(idx)
print('url:', url)
print('result:', result)
print('is dict:', isinstance(result, dict))
print('type:', type(result))
print('---')
# selected indexes after fitering correct results
indexes = new_indexes
我是 asyncio
的新手,我设法用它完成了一些请求。我创建了一个函数 fetch_all()
,它接收查询列表(URL)和之前使用 asyncio
创建的循环作为参数,并调用函数 fetch()
获取每个查询的结果JSON 格式:
import aiohttp
import asyncio
import ssl
import nest_asyncio
nest_asyncio.apply()
async def fetch(session, url):
async with session.get(url, ssl=ssl.SSLContext()) as response:
return await response.json()
async def fetch_all(urls, loop):
async with aiohttp.ClientSession(loop=loop) as session:
return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
results = loop.run_until_complete(fetch_all(queries, loop))
这工作正常,我在 results
中得到查询结果作为 JSON 列表(字典)。但是我的问题来了:有时,我得到一些结果错误(RuntimeError
、aiohttp.client_exceptions.ClientConnectorError
等),而不是 JSON。我想这些都是一次性错误,因为如果我单独重做查询,我会得到想要的结果。因此,我想出了一个 while
循环来检查哪些结果不是字典并重做它们的查询:我用查询及其索引初始化 repeat_queries
、error_index
和 results
,并应用 run_until_complete()
。然后我保存每个作为字典的结果并更新剩下的查询列表及其索引:
repeat_queries = queries
error_index = list(range(len(repeat_queries)))
results = error_index
while error_index:
if __name__ == '__main__':
loop = asyncio.get_event_loop()
repeat_results = loop.run_until_complete(fetch_all(repeat_queries, loop))
for i, rr in zip(error_index, repeat_results):
results[i] = rr
error_index = [i for i in range(len(results)) if not isinstance(results[i], dict)]
repeat_queries = [repeat_queries[i] for i in error_index]
但是,由于asyncio
循环是异步的,error_index
和repeat_queries
更新在run_until_complete()
完成之前执行,循环不断运行 查询已经在之前的迭代中执行,导致(几乎)无限 while
循环。
因此,我的问题是:
loop.run_until_complete()
完成后,有什么方法可以强制执行某些代码?
我在 Whosebug 中看到了一些类似的问题,但我无法应用他们的任何答案。
我会以不同的方式做到这一点。
我会 运行 使用 try/except
在 fetch()
中循环以捕获异常并重复它。
因为有些问题永远无法给出结果所以while
-循环可能运行永远-所以我宁愿使用for _ in range(3)
只尝试三次
我也会从 fetch
return url
这样获取不给出结果的 url 会更容易。
import aiohttp
import asyncio
import ssl
async def fetch(session, url):
exception = None
for number in range(3): # try only 3 times
try:
async with session.get(url, ssl=ssl.SSLContext()) as response:
data = await response.json()
#print('data:', data)
return url, data
except Exception as ex:
print('[ERROR] {} | {} | {}'.format(url, number+1, ex))
exception = ex
return url, exception
async def fetch_all(urls, loop):
async with aiohttp.ClientSession(loop=loop) as session:
return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
queries = [
'https://httpbin.org/get',
'https://toscrape.com',
'https://fake.domain/'
]
if __name__ == '__main__':
loop = asyncio.get_event_loop()
results = loop.run_until_complete(fetch_all(queries, loop))
#print(results)
print('--- results ---')
for url, result in results:
print('url:', url)
print('result:', result)
print('is dict:', isinstance(result, dict))
print('type:', type(result))
print('---')
结果:
[ERROR] https://fake.domain/ | 1 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc2c0> [Name or service not known]
[ERROR] https://fake.domain/ | 2 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc440> [Name or service not known]
[ERROR] https://fake.domain/ | 3 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc9c0> [Name or service not known]
[ERROR] https://toscrape.com | 1 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
[ERROR] https://toscrape.com | 2 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
[ERROR] https://toscrape.com | 3 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
--- results ---
url: https://httpbin.org/get
result: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.7.4.post0', 'X-Amzn-Trace-Id': 'Root=1-60e5c00e-45aae85e78277e5122b262c9'}, 'origin': '83.11.175.159', 'url': 'https://httpbin.org/get'}
is dict: True
type: <class 'dict'>
---
url: https://toscrape.com
result: 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
is dict: False
type: <class 'aiohttp.client_exceptions.ContentTypeError'>
---
url: https://fake.domain/
result: Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc9c0> [Name or service not known]
is dict: False
type: <class 'aiohttp.client_exceptions.ClientConnectorError'>
---
编辑:
使用你的循环方法的版本 run_until_complete
但我会在一个 for
循环中完成所有操作。
我会用 for _ in range(3)
只重复三遍。
这可行,但以前的版本似乎更简单。
import aiohttp
import asyncio
import ssl
async def fetch(session, url):
async with session.get(url, ssl=ssl.SSLContext()) as response:
return await response.json()
async def fetch_all(urls, loop):
async with aiohttp.ClientSession(loop=loop) as session:
return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
queries = [
'https://httpbin.org/get',
'https://httpbin.org/json',
'https://toscrape.com',
'https://fake.domain/'
]
if __name__ == '__main__':
# you can get it once
loop = asyncio.get_event_loop()
# original all queries
all_queries = queries
# places for all results
all_results = [None] * len(all_queries)
# selected indexes at start
indexes = list(range(len(all_queries)))
for number in range(3):
# selected queries
queries = [all_queries[idx] for idx in indexes]
# selected results
results = loop.run_until_complete(fetch_all(queries, loop))
print('\n--- try:', number+1, '--- results:', len(results), '---\n')
new_indexes = []
for idx, url, result in zip(indexes, queries, results):
all_results[idx] = result
if not isinstance(result, dict):
new_indexes.append(idx)
print('url:', url)
print('result:', result)
print('is dict:', isinstance(result, dict))
print('type:', type(result))
print('---')
# selected indexes after fitering correct results
indexes = new_indexes