运行 在执行器中简单地构建一个大列表的非异步函数?

Run non-async function that simply build a large list in executor?

代码中的逻辑是通过(异步)HTTP 请求提取数据,然后构建一个大的字典列表,其中一个值是随机生成的:

import asyncio
import random
import string
import time

from concurrent.futures import ProcessPoolExecutor
from itertools import cycle

from httpx import AsyncClient

URL = 'http://localhost:8080'
COUNT = 1_000_000


def rand_str(length=10):
    return ''.join(random.choice(string.ascii_uppercase) for i in range(length))


def parser(data, count):
    items = []

    for _, item in zip(range(count), cycle(data)):
        item['instance'] = rand_str()
        items.append(item)

    return items


async def parser_coro(data, count):
    items = []

    for _, item in zip(range(count), cycle(data)):
        item['instance'] = rand_str()
        items.append(item)

    return items


async def run_in_executor(func, pool, *args, **kwargs):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(pool, func, *args, **kwargs)


async def main():
    async with AsyncClient(base_url=URL) as client:
        r = await client.get('/api/alerts/')
        data = r.json()

    # Case 1
    t1 = time.perf_counter()
    parser(data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 1 - sync: {t2 - t1:.3f}s')
    
    # Case 2
    t1 = time.perf_counter()
    await parser_coro(data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 2 - coro (no await): {t2 - t1:.3f}s')

    # Case 3
    t1 = time.perf_counter()
    await run_in_executor(parser, None, data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 3 - thread executor: {t2 - t1:.3f}s')

    # Case 4
    t1 = time.perf_counter()
    with ProcessPoolExecutor() as executor:
        await run_in_executor(parser, executor, data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 4 - process executor: {t2 - t1:.3f}s')


if __name__ == '__main__':
    asyncio.run(main(), debug=True)

测试:

$ python test.py 
Case 1 - sync: 6.593s
Case 2 - coro (no await): 6.565s
Executing <Task pending name='Task-1' coro=<main() running at test.py:63> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efff962a1f0>()] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:184] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:591> took 13.176 seconds
Case 3 - thread executor: 6.675s
Case 4 - process executor: 6.726s

问题:

我是否应该 运行 执行程序中的 parser 函数,以便在生成列表时它不会阻塞主线程,或者在这种情况下它无济于事?在这种情况下,这实际上是 CPU 或 I/O 绑定工作负载吗?我想没有任何 IO,但正在构建一个 CPU 密集型任务列表,因此工作量受到 CPU 限制?

Should I run the parser function in the executor so it's not blocking the main thread while the list is generated or it won't help in this case?

是的,你应该。尽管有全局解释器锁,但使用单独的线程会有所帮助,因为 Python 将允许执行从解析切换到 asyncio 线程,而 parser 并不知道。因此,使用线程将防止事件循环被阻塞 6 秒,或者 运行 函数需要多长时间。

请注意,parser_coro 变体与没有执行程序的 parser 变体没有什么不同,因为它不等待任何东西。 await parser_coro(...) 将停止通风循环,就像 executor-less 调用 parser(...).

Is this actually a CPU or I/O bound workload in this case?

其余的工作量我无法评论,但写的功能肯定是CPU-bound。

Can I run this in the ThreadPoolExecutor so it's not blocking or it must be a ProcessPoolExecutor as it's a CPU-bound function?

当然可以 运行 ThreadPoolExecutor。只是如果你有一堆并行的 运行ning,它们将共享相同的 CPU 内核。 (但它们不会阻塞其他协程,因为它们会 运行 离开 event-loop 线程。)