运行 来自 asyncio 循环的缓慢后台阻塞任务

Run slow background blocking task from asyncio loop

我有 asyncio 爬虫,它访问 URL 并从 HTML 响应中收集新的 URL。我受到了那个很棒的工具的启发:https://github.com/aio-libs/aiohttp/blob/master/examples/legacy/crawl.py

这是一个非常简化的工作流程,它是如何工作的:

import asyncio
import aiohttp

class Requester:

    def __init__(self):
        self.sem = asyncio.BoundedSemaphore(1)

    async def fetch(self, url, client):

            async with client.get(url) as response:
                data = (await response.read()).decode('utf-8', 'replace')
                print("URL:", url, " have code:", response.status)
                return response, data

    async def run(self, urls):
        async with aiohttp.ClientSession() as client:
            for url in urls:
                await self.sem.acquire()
                task = asyncio.create_task(self.fetch(url, client))
                task.add_done_callback(lambda t: self.sem.release())

    def http_crawl(self, _urls_list):
        loop = asyncio.get_event_loop()
        crawl_loop = asyncio.ensure_future(self.run(_urls_list))
        loop.run_until_complete(crawl_loop)

r = Requester()
_url_list = ['https://www.google.com','https://images.google.com','https://maps.google.com','https://mail.google.com','https://news.google.com','https://video.google.com','https://books.google.com']
r.http_crawl(_url_list)

我现在需要的是添加一些非常慢的基于 beautifulsoap 的函数。我需要该功能不阻塞主循环并作为后台进程工作。例如,我将处理 HTTP 响应。

我阅读了 python 文档,发现:https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

我试图将它添加到我的代码中,但它无法正常工作(我使用 cpu_bound 仅用于演示):

import asyncio
import aiohttp
import concurrent.futures

def cpu_bound():
    return sum(i * i for i in range(10 ** 7))

class Requester:

    def __init__(self):
        self.sem = asyncio.BoundedSemaphore(1)

    async def fetch(self, url, client):

            async with client.get(url) as response:
                data = (await response.read()).decode('utf-8', 'replace')
                print("URL:", url, " have code:", response.status)
                ####### Blocking operation #######
                loop = asyncio.get_running_loop()
                with concurrent.futures.ProcessPoolExecutor() as pool:
                    result = await loop.run_in_executor(pool, cpu_bound)
                    print('custom process pool', result)
                #################################
                return response, data

    async def run(self, urls):
        async with aiohttp.ClientSession() as client:
            for url in urls:
                await self.sem.acquire()
                task = asyncio.create_task(self.fetch(url, client))
                task.add_done_callback(lambda t: self.sem.release())

    def http_crawl(self, _urls_list):
        loop = asyncio.get_event_loop()
        crawl_loop = asyncio.ensure_future(self.run(_urls_list))
        loop.run_until_complete(crawl_loop)

r = Requester()
_url_list = ['https://www.google.com','https://images.google.com','https://maps.google.com','https://mail.google.com','https://news.google.com','https://video.google.com','https://books.google.com']
r.http_crawl(_url_list)

目前,它没有按预期工作,它每次都会阻止 HTTP 请求:

URL: https://www.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://images.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://maps.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://mail.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://news.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://video.google.com  have code: 200
custom process pool 333333283333335000000

如何在asyncio主进程中正确的将任务置于后台?

是否有关于如何以简单方式做到这一点的最佳实践,或者我应该使用 Redis 进行任务规划?

我认为,由于您将 BoundedSemaphore 设置为 1,因此一次只允许一个任务实例 运行。

您可以使用ratelimiter包来限制一定时间内的并发请求数。

我还会上传适合我的代码。它是两个独立的异步队列,其中一个在单独的循环中产生 high-CPU 消费过程:

import asyncio
import functools
import aiohttp
import concurrent.futures

def cpu_bound(num):
    return sum(i * i for i in range(10 ** num))

class Requester:

    def __init__(self):
        self.threads = 3
        self.threads2 = 10
        self.pool = concurrent.futures.ProcessPoolExecutor()

    async def fetch(self, url):
        try:
            timeout = aiohttp.ClientTimeout(total=10)
            async with self.client.get(url, allow_redirects=False, verify_ssl=False, timeout=timeout) as response:
                data = (await response.read()).decode('utf-8', 'replace')
                print("URL:", url, " have code:", response.status)
                resp_list = {'url': str(response.real_url), 'data': str(data), 'headers': dict(response.headers)}
                return resp_list

        except Exception as err:
            print(err)
            return {}

    async def heavy_worker(self, a):
        while True:
            resp_list = await a.get()
            if resp_list.keys():
                ####### Blocking operation #######
                try:
                    loop = asyncio.get_event_loop()
                    result = await loop.run_in_executor(self.pool, functools.partial(cpu_bound, num=5))
                    print('wappalazer', result)
                except Exception as err:
                    print(err)
            #################################
                a.task_done()
            
            else:
                a.task_done()

    async def fetch_worker(self, q, a):
        while True:
            url = await q.get()
            resp_list = await self.fetch(url)
            q.task_done()
            await a.put(resp_list)

    async def main(self, urls):
        # Create an queues those we will use to store our "workload".
        q = asyncio.Queue()
        a = asyncio.Queue()

        # Create workers tasks to process the queue concurrently.
        workers_fetch = [asyncio.create_task(self.fetch_worker(q, a)) for _ in range(self.threads)]
        workers_heavy = [asyncio.create_task(self.heavy_worker(a)) for _ in range(self.threads2)]

        for url in urls:
            await q.put(url)

        # wait for all tasks to be processed
        await q.join()
        await a.join()

        # Cancel our worker tasks.
        for worker in workers_fetch:
            worker.cancel()
        await asyncio.gather(*workers_fetch , return_exceptions=True)
        for worker in workers_heavy:
            worker.cancel()
        await asyncio.gather(*workers_heavy , return_exceptions=True)

    async def run(self, _urls_list):
        async with aiohttp.ClientSession() as self.client:
            task_for_first_run = asyncio.create_task(self.main(_urls_list))
            await asyncio.sleep(1)

            await task_for_first_run
            print("All tasks completed")

    def http_crawl(self, _urls_list):
        asyncio.run(self.run(_urls_list))

r = Requester()
_url_list = ['http://aaaaaaaaaaaaaaaa.aaaaaaaaaaaaaaaaaaa.aa', 'https://www.google.com','https://images.google.com','https://maps.google.com','https://mail.google.com',
             'https://news.google.com','https://video.google.com','https://books.google.com', 'https://www.google.com',
             'https://images.google.com','https://maps.google.com','https://mail.google.com','https://news.google.com',
             'https://video.google.com','https://books.google.com', 'https://www.google.com','https://images.google.com',
             'https://maps.google.com','https://mail.google.com','https://news.google.com','https://video.google.com',
             'https://books.google.com', 'https://www.google.com','https://images.google.com','https://maps.google.com',
             'https://mail.google.com','https://news.google.com','https://video.google.com','https://books.google.com',
             'https://www.google.com','https://images.google.com','https://maps.google.com','https://mail.google.com',
             'https://news.google.com','https://video.google.com','https://books.google.com']
r.http_crawl(_url_list)