使用asyncio.wait FIRST_COMPLETED时如何处理异常?
How to handle exception when using asyncio.wait FIRST_COMPLETED?
使用asyncio.wait(coro_obj, return_when=FIRST_COMPLETED)时是否有机会处理异常?
我正在尝试从多个外部资源获取我的 ip。但是当提供的网址之一被破坏时,我无法处理异常。当任何请求 returns ip.
时,循环执行应该停止
import asyncio
import aiohttp
from concurrent.futures import FIRST_COMPLETED
SERVICES = [
('ip-api1', 'http://ip-api.com/json', 'query'),
('broken_api', 'http://broken', 'query'),
]
async def get_ip(session, service, url, ip_attr):
print('fetching ip from: {}'.format(service))
async with session.get(url) as resp:
json_resp = await resp.json()
ip = json_resp[ip_attr]
return service, ip
async def get_ip_from_services():
async with aiohttp.ClientSession() as session:
coro_obj = [get_ip(session, service[0], service[1], service[2])
for service in SERVICES]
try:
done, pending = await asyncio.wait(coro_obj, return_when=FIRST_COMPLETED)
result = done.pop().result()
print(result)
for future in pending:
future.cancel()
except:
print('catched')
loop = asyncio.get_event_loop()
loop.run_until_complete(get_ip_from_services())
loop.close()
当 return_when=ALL_COMPLETED 但 FIRST_COMPLETED
时,我能够捕获异常
我遇到了以下错误:
fetching ip from: ip-api1
fetching ip from: broken_api
catched
exception calling callback for <Future at 0x103839240 state=finished returned list>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 297, in _invoke_callbacks
callback(self)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 419, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
self._check_closed()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<ThreadedResolver.resolve() running at /Volumes/external/venv/lib/python3.6/site-packages/aiohttp/resolver.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py:408, <TaskWakeupMethWrapper object at 0x103807948>()]> cb=[shield.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:679]>
Task was destroyed but it is pending!
task: <Task pending coro=<get_ip() done, defined at ./run_7.py:15> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1038073d8>()]>>
或警告
fetching ip from: ip-api1
fetching ip from: broken_api
catched
Task was destroyed but it is pending!
task: <Task pending coro=<ThreadedResolver.resolve() running at /Volumes/external/venv/lib/python3.6/site-packages/aiohttp/resolver.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py:408, <TaskWakeupMethWrapper object at 0x1038e2948>()]> cb=[shield.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:679]>
Task was destroyed but it is pending!
task: <Task pending coro=<get_ip() done, defined at ./run_7.py:15> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1038e23d8>()]>>
我认为这是 aiohttp 中的错误,我发布了它 there。
您收到的异常似乎并不严重,因此您可以保持原样,直到它在 aiohttp 中得到修复。或者,如果您现在不想看到它,请不要取消请求。
与您的代码无关的注释:
1) 不要将 concurrent.futures
与 asyncio
一起使用:它们是不同的模块。使用 asyncio.FIRST_COMPLETED
.
2) 您不仅应该取消任务,还应该等待它取消(它会引发 CancelledError
)。可以使用以下代码完成:
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):
await task
详细了解 tasks/cancelling 。
使用asyncio.wait(coro_obj, return_when=FIRST_COMPLETED)时是否有机会处理异常?
我正在尝试从多个外部资源获取我的 ip。但是当提供的网址之一被破坏时,我无法处理异常。当任何请求 returns ip.
时,循环执行应该停止import asyncio
import aiohttp
from concurrent.futures import FIRST_COMPLETED
SERVICES = [
('ip-api1', 'http://ip-api.com/json', 'query'),
('broken_api', 'http://broken', 'query'),
]
async def get_ip(session, service, url, ip_attr):
print('fetching ip from: {}'.format(service))
async with session.get(url) as resp:
json_resp = await resp.json()
ip = json_resp[ip_attr]
return service, ip
async def get_ip_from_services():
async with aiohttp.ClientSession() as session:
coro_obj = [get_ip(session, service[0], service[1], service[2])
for service in SERVICES]
try:
done, pending = await asyncio.wait(coro_obj, return_when=FIRST_COMPLETED)
result = done.pop().result()
print(result)
for future in pending:
future.cancel()
except:
print('catched')
loop = asyncio.get_event_loop()
loop.run_until_complete(get_ip_from_services())
loop.close()
当 return_when=ALL_COMPLETED 但 FIRST_COMPLETED
时,我能够捕获异常我遇到了以下错误:
fetching ip from: ip-api1
fetching ip from: broken_api
catched
exception calling callback for <Future at 0x103839240 state=finished returned list>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 297, in _invoke_callbacks
callback(self)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 419, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
self._check_closed()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<ThreadedResolver.resolve() running at /Volumes/external/venv/lib/python3.6/site-packages/aiohttp/resolver.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py:408, <TaskWakeupMethWrapper object at 0x103807948>()]> cb=[shield.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:679]>
Task was destroyed but it is pending!
task: <Task pending coro=<get_ip() done, defined at ./run_7.py:15> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1038073d8>()]>>
或警告
fetching ip from: ip-api1
fetching ip from: broken_api
catched
Task was destroyed but it is pending!
task: <Task pending coro=<ThreadedResolver.resolve() running at /Volumes/external/venv/lib/python3.6/site-packages/aiohttp/resolver.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py:408, <TaskWakeupMethWrapper object at 0x1038e2948>()]> cb=[shield.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:679]>
Task was destroyed but it is pending!
task: <Task pending coro=<get_ip() done, defined at ./run_7.py:15> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1038e23d8>()]>>
我认为这是 aiohttp 中的错误,我发布了它 there。
您收到的异常似乎并不严重,因此您可以保持原样,直到它在 aiohttp 中得到修复。或者,如果您现在不想看到它,请不要取消请求。
与您的代码无关的注释:
1) 不要将 concurrent.futures
与 asyncio
一起使用:它们是不同的模块。使用 asyncio.FIRST_COMPLETED
.
2) 您不仅应该取消任务,还应该等待它取消(它会引发 CancelledError
)。可以使用以下代码完成:
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):
await task
详细了解 tasks/cancelling