扩展应用程序时使用 concurrent.futures.ThreadPoolExecutor max workers

Working of concurrent.futures.ThreadPoolExecutor max workers when scaling up the application

我是 Python 编程新手。我的大部分代码都在使用 asyncio,因为我正在对数据库进行 IO 调用,尽管在某些情况下我使用的是非异步方法,它们很长 运行,很少 Pandas框架调用数据库,因此为了避免限制可伸缩性的阻塞调用,我使用 concurrent.futures.ThreadPoolExecutor 执行阻塞方法如下:

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
      values = executor.map(func, data)

上面的func提供的是最大长度为2的数据采集,基本上不需要超过2个线程,但是当多个用户进来,应用需要扩展的时候,那时候,什么应该是理想的 max_workers 值:

  1. 是否每个用户都需要,即2
  2. 是否应该是最大可能值,如 link - https://docs.python.org/3/library/concurrent.futures.html
  3. 中所述

Changed in version 3.8: Default value of max_workers is changed to min(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.

  1. 不提也可以,按要求生成

要点仍然存在,如果 10 个用户开始执行相同的操作,他们最终会使用相同的 ThreadPoolExecutor(共享)还是最终获得不同的执行程序,因为这不是共享对象。我想确保在扩展应用程序时不会因设计不当而受到影响

如果调用ThreadPoolExecutor from the async code you should use asyncio run_in_executor函数,否则会阻塞主事件循环。

如果额外的工作量受到 CPU 限制,那么您也应该使用 ProcessPoolExecutor

示例来自 Python 文档:

import asyncio
import concurrent.futures

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

说到max_workers,通常默认值就可以了:

  • ThreadPoolExecutor: min(32, (os.cpu_count() or 1) + 4)

  • ProcessPoolExecutor: os.cpu_count() or 1

这取决于您的工作量(CPU 与 I/O 绑定)但是对于 CPU 绑定任务,没有必要将其设置为大于可用的数字 CPUs,因为它实际上可能会因上下文切换等而降低性能。

两个执行者都在使用队列来排队和安排可用的任务 threads/processes。

更新:3 月 25 日星期四 15:17:51 UTC 2021

asyncio 事件循环是单线程的,因此当您同时安排其他协程时,您会看到这个问题。如您所见,none-blocking 任务被 blocking executor:

阻塞了 10 秒
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
END none-blocking: (elapsed: 10.0s)

如果你 运行 这几次并且 blocking executor 将首先开始,none-blocking 任务甚至不会在 blocking executor 结束之前开始:

$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)

当您注释掉 blocking executor 时,您可以看到现在所有调用都是异步的:

$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
END none-blocking executor: (elapsed: 5.0s)

要点在于,一旦开始编写异步代码,就不能将其与同步调用混合使用。

test.py:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking(msg, t):
    t1 = time.perf_counter()

    print(f"START {msg}: (scheduled: {t}s)")
    time.sleep(t)
    print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")


async def task1(msg, t):
    t1 = time.perf_counter()

    print(f"START {msg}: (scheduled: {t}s)")
    await asyncio.sleep(t)
    print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")


async def task2(msg, t):
    with ThreadPoolExecutor() as executor:
        future = executor.submit(blocking, msg, t)
        future.result()


async def main():
    loop = asyncio.get_running_loop()

    aws = [
        task1("none-blocking", 1.0),
        loop.run_in_executor(None, blocking, "none-blocking executor", 5.0),
        task2("blocking executor", 10.0),
    ]

    for coro in asyncio.as_completed(aws):
        await coro


if __name__ == "__main__":
    asyncio.run(main())