取消执行器中的异步任务 运行

Cancelling asyncio task run in executor

我正在抓取一些网站,使用 asyncio 并行化 requests 库:

def run():
  asyncio.run(scrape());

def check_link(link):
  #.... code code code ...
  response = requests.get(link)
  #.... code code code ...
  write_some_stats_into_db()

async def scrape():
  #.... code code code ...
  task = asyncio.get_event_loop().run_in_executor(check_link(link));
  #.... code code code ...
  if done:
    for task in all_tasks:
      task.cancel();

我只需要找到一个'correct' link,然后我就可以停止程序了。但是,因为 check_link 在执行器中是 运行,它的线程会自动守护进程,因此即使在调用 taks.cancel() 之后,我仍然必须等待所有其他 运行ning check_link 完成。

你有什么想法如何 'force-kill' 另一个 运行ning 检查线程执行器吗?

你可以按照下面的方式来做,实际上从我的角度来看,如果你不必使用 asyncio 来完成任务,只使用没有任何异步循环的线程,因为它使你的代码更复杂。

import asyncio
from random import randint
import time
from functools import partial

# imagine that this is links array
LINKS = list(range(1000))

# how many thread-worker you want to have simultaneously
WORKERS_NUM = 10

# stops the app
STOP_EVENT = asyncio.Event()
STOP_EVENT.clear()


def check_link(link: str) -> int:
    """checks link in another thread and returns result"""
    time.sleep(3)
    r = randint(1, 11)
    print(f"{link}____{r}\n")
    return r


async def check_link_wrapper(q: asyncio.Queue):
    """Async wrapper around sync function"""
    loop = asyncio.get_event_loop()

    while not STOP_EVENT.is_set():
        link = await q.get()

        if not link:
            break

        value = await loop.run_in_executor(None, func=partial(check_link, link))

        if value == 10:
            STOP_EVENT.set()
            print("Hurray! We got TEN !")


async def feeder(q: asyncio.Queue):
    """Send tasks and "poison pill" to all workers"""
    # send tasks to workers
    for link in LINKS:
        await q.put(link)

    # ask workers to stop
    for _ in range(WORKERS_NUM):
        await q.put(None)


async def amain():
    """Main async function of the app"""
    # maxsize is one since we want the app
    # to stop as fast as possible if stop condition is met
    q = asyncio.Queue(maxsize=1)
    # we create separate task, since we do not want to await feeder
    # we are interested only in workers
    asyncio.create_task(feeder(q))
    await asyncio.gather(
        *[check_link_wrapper(q) for _ in range(WORKERS_NUM)],
    )


if __name__ == '__main__':
    asyncio.run(amain())