扩展应用程序时使用 concurrent.futures.ThreadPoolExecutor max workers
Working of concurrent.futures.ThreadPoolExecutor max workers when scaling up the application
我是 Python 编程新手。我的大部分代码都在使用 asyncio
,因为我正在对数据库进行 IO 调用,尽管在某些情况下我使用的是非异步方法,它们很长 运行,很少 Pandas框架调用数据库,因此为了避免限制可伸缩性的阻塞调用,我使用 concurrent.futures.ThreadPoolExecutor
执行阻塞方法如下:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
values = executor.map(func, data)
上面的func
提供的是最大长度为2的数据采集,基本上不需要超过2个线程,但是当多个用户进来,应用需要扩展的时候,那时候,什么应该是理想的 max_workers
值:
- 是否每个用户都需要,即2
- 是否应该是最大可能值,如 link - https://docs.python.org/3/library/concurrent.futures.html
中所述
Changed in version 3.8: Default value of max_workers is changed to min(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.
- 不提也可以,按要求生成
要点仍然存在,如果 10 个用户开始执行相同的操作,他们最终会使用相同的 ThreadPoolExecutor
(共享)还是最终获得不同的执行程序,因为这不是共享对象。我想确保在扩展应用程序时不会因设计不当而受到影响
如果调用ThreadPoolExecutor from the async code you should use asyncio
run_in_executor函数,否则会阻塞主事件循环。
如果额外的工作量受到 CPU 限制,那么您也应该使用 ProcessPoolExecutor。
示例来自 Python 文档:
import asyncio
import concurrent.futures
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
说到max_workers
,通常默认值就可以了:
ThreadPoolExecutor
: min(32, (os.cpu_count() or 1) + 4)
ProcessPoolExecutor
: os.cpu_count() or 1
这取决于您的工作量(CPU 与 I/O 绑定)但是对于 CPU 绑定任务,没有必要将其设置为大于可用的数字 CPUs,因为它实际上可能会因上下文切换等而降低性能。
两个执行者都在使用队列来排队和安排可用的任务 threads/processes。
更新:3 月 25 日星期四 15:17:51 UTC 2021
asyncio
事件循环是单线程的,因此当您同时安排其他协程时,您会看到这个问题。如您所见,none-blocking
任务被 blocking executor
:
阻塞了 10 秒
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
END none-blocking: (elapsed: 10.0s)
如果你 运行 这几次并且 blocking executor
将首先开始,none-blocking
任务甚至不会在 blocking executor
结束之前开始:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
当您注释掉 blocking executor
时,您可以看到现在所有调用都是异步的:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
END none-blocking executor: (elapsed: 5.0s)
要点在于,一旦开始编写异步代码,就不能将其与同步调用混合使用。
test.py:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking(msg, t):
t1 = time.perf_counter()
print(f"START {msg}: (scheduled: {t}s)")
time.sleep(t)
print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
async def task1(msg, t):
t1 = time.perf_counter()
print(f"START {msg}: (scheduled: {t}s)")
await asyncio.sleep(t)
print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
async def task2(msg, t):
with ThreadPoolExecutor() as executor:
future = executor.submit(blocking, msg, t)
future.result()
async def main():
loop = asyncio.get_running_loop()
aws = [
task1("none-blocking", 1.0),
loop.run_in_executor(None, blocking, "none-blocking executor", 5.0),
task2("blocking executor", 10.0),
]
for coro in asyncio.as_completed(aws):
await coro
if __name__ == "__main__":
asyncio.run(main())
我是 Python 编程新手。我的大部分代码都在使用 asyncio
,因为我正在对数据库进行 IO 调用,尽管在某些情况下我使用的是非异步方法,它们很长 运行,很少 Pandas框架调用数据库,因此为了避免限制可伸缩性的阻塞调用,我使用 concurrent.futures.ThreadPoolExecutor
执行阻塞方法如下:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
values = executor.map(func, data)
上面的func
提供的是最大长度为2的数据采集,基本上不需要超过2个线程,但是当多个用户进来,应用需要扩展的时候,那时候,什么应该是理想的 max_workers
值:
- 是否每个用户都需要,即2
- 是否应该是最大可能值,如 link - https://docs.python.org/3/library/concurrent.futures.html 中所述
Changed in version 3.8: Default value of max_workers is changed to min(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.
- 不提也可以,按要求生成
要点仍然存在,如果 10 个用户开始执行相同的操作,他们最终会使用相同的 ThreadPoolExecutor
(共享)还是最终获得不同的执行程序,因为这不是共享对象。我想确保在扩展应用程序时不会因设计不当而受到影响
如果调用ThreadPoolExecutor from the async code you should use asyncio
run_in_executor函数,否则会阻塞主事件循环。
如果额外的工作量受到 CPU 限制,那么您也应该使用 ProcessPoolExecutor。
示例来自 Python 文档:
import asyncio
import concurrent.futures
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
说到max_workers
,通常默认值就可以了:
ThreadPoolExecutor
:min(32, (os.cpu_count() or 1) + 4)
ProcessPoolExecutor
:os.cpu_count() or 1
这取决于您的工作量(CPU 与 I/O 绑定)但是对于 CPU 绑定任务,没有必要将其设置为大于可用的数字 CPUs,因为它实际上可能会因上下文切换等而降低性能。
两个执行者都在使用队列来排队和安排可用的任务 threads/processes。
更新:3 月 25 日星期四 15:17:51 UTC 2021
asyncio
事件循环是单线程的,因此当您同时安排其他协程时,您会看到这个问题。如您所见,none-blocking
任务被 blocking executor
:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
END none-blocking: (elapsed: 10.0s)
如果你 运行 这几次并且 blocking executor
将首先开始,none-blocking
任务甚至不会在 blocking executor
结束之前开始:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
当您注释掉 blocking executor
时,您可以看到现在所有调用都是异步的:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
END none-blocking executor: (elapsed: 5.0s)
要点在于,一旦开始编写异步代码,就不能将其与同步调用混合使用。
test.py:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking(msg, t):
t1 = time.perf_counter()
print(f"START {msg}: (scheduled: {t}s)")
time.sleep(t)
print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
async def task1(msg, t):
t1 = time.perf_counter()
print(f"START {msg}: (scheduled: {t}s)")
await asyncio.sleep(t)
print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
async def task2(msg, t):
with ThreadPoolExecutor() as executor:
future = executor.submit(blocking, msg, t)
future.result()
async def main():
loop = asyncio.get_running_loop()
aws = [
task1("none-blocking", 1.0),
loop.run_in_executor(None, blocking, "none-blocking executor", 5.0),
task2("blocking executor", 10.0),
]
for coro in asyncio.as_completed(aws):
await coro
if __name__ == "__main__":
asyncio.run(main())