Python asyncio 没有显示任何错误
Python asyncio does not show any errors
我正在尝试使用 asyncio 从数千个 url 中获取一些数据。
以下是设计的简要概述:
- 使用单个
Producer
一次性填满 Queue
一堆网址
- 生成一堆
Consumers
- 每个
Consumer
保持从 Queue
异步提取 url 并发送 GET
请求
- 对结果做一些后处理
- 合并所有处理的结果和return
问题: asyncio
几乎从不显示任何错误,它只是静静地挂起而没有错误。我到处放 print
语句来自己检测问题,但没有太大帮助。
根据输入网址的数量和消费者或限制的数量,我可能会遇到这些错误:
Task was destroyed but it is pending!
task exception was never retrieved future: <Task finished coro=<consumer()
aiohttp.client_exceptions.ServerDisconnectedError
aiohttp.client_exceptions.ClientOSError: [WinError 10053] An established connection was aborted by the software in your host machine
问题:如何检测和处理asyncio
中的异常?如何在不中断 Queue
的情况下重试?
Bellow 是我在查看各种异步代码示例时编译的代码。目前,def get_video_title
函数末尾存在故意错误。当 运行 时,什么也没有显示。
import asyncio
import aiohttp
import json
import re
import nest_asyncio
nest_asyncio.apply() # jupyter notebook throws errors without this
user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"
def get_video_title(data):
match = re.search(r'window\[["\']ytInitialPlayerResponse["\']\]\s*=\s*(.*)', data)
string = match[1].strip()[:-1]
result = json.loads(string)
return result['videoDetails']['TEST_ERROR'] # <---- should be 'title'
async def fetch(session, url, c):
async with session.get(url, headers={"user-agent": user_agent}, raise_for_status=True, timeout=60) as r:
print('---------Fetching', c)
if r.status != 200:
r.raise_for_status()
return await r.text()
async def consumer(queue, session, responses):
while True:
try:
i, url = await queue.get()
print("Fetching from a queue", i)
html_page = await fetch(session, url, i)
print('+++Processing', i)
result = get_video_title(html_page) # should raise an error here!
responses.append(result)
queue.task_done()
print('+++Task Done', i)
except (aiohttp.http_exceptions.HttpProcessingError, asyncio.TimeoutError) as e:
print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Error', i, type(e))
await asyncio.sleep(1)
queue.task_done()
async def produce(queue, urls):
for i, url in enumerate(urls):
print('Putting in a queue', i)
await queue.put((i, url))
async def run(session, urls, consumer_num):
queue, responses = asyncio.Queue(maxsize=2000), []
print('[Making Consumers]')
consumers = [asyncio.ensure_future(
consumer(queue, session, responses))
for _ in range(consumer_num)]
print('[Making Producer]')
producer = await produce(queue=queue, urls=urls)
print('[Joining queue]')
await queue.join()
print('[Cancelling]')
for consumer_future in consumers:
consumer_future.cancel()
print('[Returning results]')
return responses
async def main(loop, urls):
print('Starting a Session')
async with aiohttp.ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=300)) as session:
print('Calling main function')
posts = await run(session, urls, 100)
print('Done')
return posts
if __name__ == '__main__':
urls = ['https://www.youtube.com/watch?v=dNQs_Bef_V8'] * 100
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop, urls))
问题是您的 consumer
只捕获了两个非常具体的异常,并且在它们的情况下将任务标记为已完成。如果发生任何其他异常,例如与网络相关的异常,它将终止消费者。但是,run
未检测到这一点,它正在后台等待 queue.join()
消费者(有效地)运行。这就是您的程序挂起的原因 - 永远不会考虑排队的项目,并且永远不会完全处理队列。
有两种方法可以解决此问题,具体取决于您希望程序在遇到意外异常时执行的操作。如果你想让它保持 运行,你可以向消费者添加一个包罗万象的 except
子句,例如:
except Exception as e
print('other error', e)
queue.task_done()
备选方案是将 未处理的 消费者异常传播到 run
。这必须明确安排,但具有永远不允许异常静默传递的优点。 (有关该主题的详细处理,请参阅 this article。)实现它的一种方法是同时等待 queue.join()
和消费者;由于消费者处于无限循环中,因此只有在出现异常时才会完成。
print('[Joining queue]')
# wait for either `queue.join()` to complete or a consumer to raise
done, _ = await asyncio.wait([queue.join(), *consumers],
return_when=asyncio.FIRST_COMPLETED)
consumers_raised = set(done) & set(consumers)
if consumers_raised:
await consumers_raised.pop() # propagate the exception
Questions: how to detect and handle exceptions in asyncio?
异常通过 await
传播,并且通常像在任何其他代码中一样检测和处理。特殊处理只需要捕获从 "background" 任务泄漏的异常,例如 consumer
.
how to retry without disrupting the Queue ?
您可以在except
块中调用await queue.put((i, url))
。该项目将被添加到队列的后面,由消费者拿起。在那种情况下,您只需要第一个片段,并且不想费心尝试将 consumer
中的异常传播到 run
.
我正在尝试使用 asyncio 从数千个 url 中获取一些数据。 以下是设计的简要概述:
- 使用单个
Producer
一次性填满 - 生成一堆
Consumers
- 每个
Consumer
保持从Queue
异步提取 url 并发送GET
请求 - 对结果做一些后处理
- 合并所有处理的结果和return
Queue
一堆网址
问题: asyncio
几乎从不显示任何错误,它只是静静地挂起而没有错误。我到处放 print
语句来自己检测问题,但没有太大帮助。
根据输入网址的数量和消费者或限制的数量,我可能会遇到这些错误:
Task was destroyed but it is pending!
task exception was never retrieved future: <Task finished coro=<consumer()
aiohttp.client_exceptions.ServerDisconnectedError
aiohttp.client_exceptions.ClientOSError: [WinError 10053] An established connection was aborted by the software in your host machine
问题:如何检测和处理asyncio
中的异常?如何在不中断 Queue
的情况下重试?
Bellow 是我在查看各种异步代码示例时编译的代码。目前,def get_video_title
函数末尾存在故意错误。当 运行 时,什么也没有显示。
import asyncio
import aiohttp
import json
import re
import nest_asyncio
nest_asyncio.apply() # jupyter notebook throws errors without this
user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"
def get_video_title(data):
match = re.search(r'window\[["\']ytInitialPlayerResponse["\']\]\s*=\s*(.*)', data)
string = match[1].strip()[:-1]
result = json.loads(string)
return result['videoDetails']['TEST_ERROR'] # <---- should be 'title'
async def fetch(session, url, c):
async with session.get(url, headers={"user-agent": user_agent}, raise_for_status=True, timeout=60) as r:
print('---------Fetching', c)
if r.status != 200:
r.raise_for_status()
return await r.text()
async def consumer(queue, session, responses):
while True:
try:
i, url = await queue.get()
print("Fetching from a queue", i)
html_page = await fetch(session, url, i)
print('+++Processing', i)
result = get_video_title(html_page) # should raise an error here!
responses.append(result)
queue.task_done()
print('+++Task Done', i)
except (aiohttp.http_exceptions.HttpProcessingError, asyncio.TimeoutError) as e:
print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Error', i, type(e))
await asyncio.sleep(1)
queue.task_done()
async def produce(queue, urls):
for i, url in enumerate(urls):
print('Putting in a queue', i)
await queue.put((i, url))
async def run(session, urls, consumer_num):
queue, responses = asyncio.Queue(maxsize=2000), []
print('[Making Consumers]')
consumers = [asyncio.ensure_future(
consumer(queue, session, responses))
for _ in range(consumer_num)]
print('[Making Producer]')
producer = await produce(queue=queue, urls=urls)
print('[Joining queue]')
await queue.join()
print('[Cancelling]')
for consumer_future in consumers:
consumer_future.cancel()
print('[Returning results]')
return responses
async def main(loop, urls):
print('Starting a Session')
async with aiohttp.ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=300)) as session:
print('Calling main function')
posts = await run(session, urls, 100)
print('Done')
return posts
if __name__ == '__main__':
urls = ['https://www.youtube.com/watch?v=dNQs_Bef_V8'] * 100
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop, urls))
问题是您的 consumer
只捕获了两个非常具体的异常,并且在它们的情况下将任务标记为已完成。如果发生任何其他异常,例如与网络相关的异常,它将终止消费者。但是,run
未检测到这一点,它正在后台等待 queue.join()
消费者(有效地)运行。这就是您的程序挂起的原因 - 永远不会考虑排队的项目,并且永远不会完全处理队列。
有两种方法可以解决此问题,具体取决于您希望程序在遇到意外异常时执行的操作。如果你想让它保持 运行,你可以向消费者添加一个包罗万象的 except
子句,例如:
except Exception as e
print('other error', e)
queue.task_done()
备选方案是将 未处理的 消费者异常传播到 run
。这必须明确安排,但具有永远不允许异常静默传递的优点。 (有关该主题的详细处理,请参阅 this article。)实现它的一种方法是同时等待 queue.join()
和消费者;由于消费者处于无限循环中,因此只有在出现异常时才会完成。
print('[Joining queue]')
# wait for either `queue.join()` to complete or a consumer to raise
done, _ = await asyncio.wait([queue.join(), *consumers],
return_when=asyncio.FIRST_COMPLETED)
consumers_raised = set(done) & set(consumers)
if consumers_raised:
await consumers_raised.pop() # propagate the exception
Questions: how to detect and handle exceptions in asyncio?
异常通过 await
传播,并且通常像在任何其他代码中一样检测和处理。特殊处理只需要捕获从 "background" 任务泄漏的异常,例如 consumer
.
how to retry without disrupting the Queue ?
您可以在except
块中调用await queue.put((i, url))
。该项目将被添加到队列的后面,由消费者拿起。在那种情况下,您只需要第一个片段,并且不想费心尝试将 consumer
中的异常传播到 run
.