使用asyncio.wait_for和asyncio.Semaphore时如何正确捕捉concurrent.futures._base.TimeoutError?

How to catch concurrent.futures._base.TimeoutError correctly when using asyncio.wait_for and asyncio.Semaphore?

首先,我需要警告你:我是 asyncio 的新手,我会 我马上警告你,我是 asyncio 的新手,我很难想象引擎盖下的库里有什么。

这是我的代码:

import asyncio

semaphore = asyncio.Semaphore(50)

async def work(value):
    async with semaphore:
        print(value)
        await asyncio.sleep(10)

async def main():
    tasks = []
    for i in range(0, 10000):
        tasks.append(asyncio.wait_for(work(i), timeout=3))
    await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)

我需要的是:Coroutine work()完成时间不超过3秒,同时不超过50个。 3 秒后(超时),协程 work() 必须停止执行,并且新的 50 个任务必须开始工作。但就我而言,3 秒后崩溃:

Traceback (most recent call last):
  File "C:/Users/root/PycharmProjects/LogParser/ssh/async/asyn_test.py", line 19, in <module>
    loop.run_until_complete(future)
  File "C:\Code\Python3\lib\asyncio\base_events.py", line 579, in run_until_complete
    return future.result()
  File "C:/Users/root/PycharmProjects/LogParser/ssh/async/asyn_test.py", line 15, in main
    await asyncio.gather(*tasks)
  File "C:\Code\Python3\lib\asyncio\tasks.py", line 449, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

无论我没有尝试捕获此异常,无论剩余多少任务​​,程序都会崩溃。我需要,在达到超时时,程序继续执行下一个任务 请教我,我需要如何正确实施它?

Python 3.7,异步 3.4.3

您需要处理异常。如果你只是把它传递给 gather,它会重新加注。例如,您可以使用适当的 try/except:

创建一个新协程
semaphore = asyncio.Semaphore(50)

async def work(value):
    print(value)
    await asyncio.sleep(10)

async def work_with_timeout(value):
    async with semaphore:
        try:
            return await asyncio.wait_for(work(value), timeout=3)
        except asyncio.TimeoutError:
            return None

async def main():
    tasks = []
    for i in range(0, 10000):
        tasks.append(work_with_timeout(i))
    await asyncio.gather(*tasks)