Python 多处理 - 线程池数未乘以池数

Python multiprocessing - number of threadpools not multiplied by number of pools

我有一个 python 3.9 代码,它使用 multiprocessing.pool 中的 PoolThreadPool。目的是有 2 Pools,每个独立产生 3 ThreadPools。换句话说,我希望 2*3 = 6 个线程 运行 并行。

但是,下面最小工作示例 (MWE) 代码的输出只导致 3 个不同的线程 ID。

我的问题:为什么会这样,我该如何合理地解决这个问题?

另外,如果这样的N_POOL * N_THREADPOOL攻略不好看,欢迎指教。实际任务是I/O-bound(网络下载后进行光预处理)。我对并行性比较陌生。

MWE 代码

from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import Queue
from threading import get_ident
import random
from time import sleep
from functools import partial

# Params
N = 12
N_CPU = 2
N_THREAD = 3
# Just for CPU numbering
CPU_QUEUE = Queue(N_CPU)
for i in range(1, 1 + N_CPU):
    CPU_QUEUE.put(i)


def split_list_to_pools(ls_data, n_pools):
    """Split data into pools as lists of approx. equal lengths."""
    n_each = int((len(ls_data) - 1) / n_pools) + 1
    return [ls_data[n_each * i:n_each * (i + 1)] for i in range(n_pools)]


def process_threadpool(data, CPU_NO=-1):
    """Process incoming data one-by-one"""
    sleep(3 + random.random())
    print(f"Threadpool id: {get_ident()} CPU_NO: {CPU_NO} / {N_CPU}, data: {data}")


def process_pool(ls_data):
    """Process a list of data."""
    # Get initial pool status
    CPU_NO = CPU_QUEUE.get()
    print(f"Pool CPU_NO: {CPU_NO}, data: {ls_data}")

    with ThreadPool(N_THREAD) as threadpool:
        for _ in threadpool.imap_unordered(partial(process_threadpool, CPU_NO=CPU_NO), ls_data):
            pass


if __name__ == '__main__':

    # given data
    ls_data = list(range(N))

    # split data to pools
    ls_ls_data = split_list_to_pools(ls_data, N_CPU)
    print(f"data rearranged for pool: {ls_ls_data}")

    # process in parallel
    with Pool(N_CPU) as pool:
        for _ in pool.imap_unordered(process_pool, ls_ls_data):
            pass

    print("Program Ended!")

输出

只有 3 个不同的线程 ID,而不是预期的 6 个。

$ python so.py
data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [0, 1, 2, 3, 4, 5]
Pool CPU_NO: 2, data: [6, 7, 8, 9, 10, 11]
Threadpool id: 140065165276928 CPU_NO: 1 / 2, data: 2
Threadpool id: 140065165276928 CPU_NO: 2 / 2, data: 8
Threadpool id: 140065182062336 CPU_NO: 2 / 2, data: 6
Threadpool id: 140065182062336 CPU_NO: 1 / 2, data: 0
Threadpool id: 140065173669632 CPU_NO: 2 / 2, data: 7
Threadpool id: 140065173669632 CPU_NO: 1 / 2, data: 1
Threadpool id: 140065165276928 CPU_NO: 1 / 2, data: 3
Threadpool id: 140065182062336 CPU_NO: 2 / 2, data: 10
Threadpool id: 140065182062336 CPU_NO: 1 / 2, data: 4
Threadpool id: 140065165276928 CPU_NO: 2 / 2, data: 9
Threadpool id: 140065173669632 CPU_NO: 1 / 2, data: 5
Threadpool id: 140065173669632 CPU_NO: 2 / 2, data: 11
Program Ended!

编辑:代码是 运行 在 debian 11

您没有指定您 运行 正在使用哪个平台,但我必须假设它是使用 fork 创建新进程的平台(例如 Linux) 或者我不相信你的代码会正常工作,因为在 spawn 下,池中的每个进程都会创建自己的全局 CPU_QUEUE 副本,因此每个进程都会获得队列中的第一项并相信它是 CPU id 1.

因此我对代码做了两处修改:

  1. 通过使用池初始化程序为具有单个队列实例的池中的每个进程初始化全局变量CPU_QUEUE,使代码在平台之间更具可移植性。
  2. 在函数 process_pool 的开头引入了对 time.sleep 的调用,使池中的每个进程都有机会处理一个已提交的任务。如果没有这个,理论上池中的一个进程可以处理所有提交的任务,而这只会降低这种可能性。

当我 运行 Linux 下的代码时,我基本上看到了你所看到的。但是,当我在 Windows 下 运行 时,由于上述更改,我现在能够做到这一点,我看到:

data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [6, 7, 8, 9, 10, 11]
Pool CPU_NO: 2, data: [0, 1, 2, 3, 4, 5]
Threadpool id: 16924 CPU_NO: 1 / 2, data: 8
Threadpool id: 15260 CPU_NO: 1 / 2, data: 6
Threadpool id: 19800 CPU_NO: 2 / 2, data: 1
Threadpool id: 7580 CPU_NO: 2 / 2, data: 2
Threadpool id: 20368 CPU_NO: 1 / 2, data: 7
Threadpool id: 18736 CPU_NO: 2 / 2, data: 0
Threadpool id: 19800 CPU_NO: 2 / 2, data: 3
Threadpool id: 16924 CPU_NO: 1 / 2, data: 9
Threadpool id: 7580 CPU_NO: 2 / 2, data: 4
Threadpool id: 15260 CPU_NO: 1 / 2, data: 10
Threadpool id: 18736 CPU_NO: 2 / 2, data: 5
Threadpool id: 20368 CPU_NO: 1 / 2, data: 11
Program Ended!

这是您希望看到的。我只能得出结论,在 Linux threading.get_ident 下,只有 returns 一个进程中的唯一值。但是,如果您改用 _thread.get_native_id(),我已将其合并到下面的源代码中,那似乎确实提供了 6 个唯一值(正如所希望的那样):

data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [0, 1, 2, 3, 4, 5]
Pool CPU_NO: 2, data: [6, 7, 8, 9, 10, 11]
Threadpool id: 81 CPU_NO: 2 / 2, data: 7
Threadpool id: 83 CPU_NO: 2 / 2, data: 8
Threadpool id: 78 CPU_NO: 1 / 2, data: 0
Threadpool id: 79 CPU_NO: 2 / 2, data: 6
Threadpool id: 80 CPU_NO: 1 / 2, data: 1
Threadpool id: 82 CPU_NO: 1 / 2, data: 2
Threadpool id: 78 CPU_NO: 1 / 2, data: 3
Threadpool id: 83 CPU_NO: 2 / 2, data: 10
Threadpool id: 81 CPU_NO: 2 / 2, data: 9
Threadpool id: 79 CPU_NO: 2 / 2, data: 11
Threadpool id: 80 CPU_NO: 1 / 2, data: 4
Threadpool id: 82 CPU_NO: 1 / 2, data: 5
Program Ended!

修订后的来源

from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import Queue
#from threading import get_ident
from threading import get_native_id
import random
from time import sleep
from functools import partial

# Params
N = 12
N_CPU = 2
N_THREAD = 3

def init_pool_processes(the_queue):
    global CPU_QUEUE
    CPU_QUEUE = the_queue

def split_list_to_pools(ls_data, n_pools):
    """Split data into pools as lists of approx. equal lengths."""
    n_each = int((len(ls_data) - 1) / n_pools) + 1
    return [ls_data[n_each * i:n_each * (i + 1)] for i in range(n_pools)]


def process_threadpool(data, CPU_NO=-1):
    """Process incoming data one-by-one"""
    sleep(3 + random.random())
    print(f"Threadpool id: {get_native_id()} CPU_NO: {CPU_NO} / {N_CPU}, data: {data}")


def process_pool(ls_data):
    """Process a list of data."""
    # Get initial pool status
    sleep(.2)
    CPU_NO = CPU_QUEUE.get()
    print(f"Pool CPU_NO: {CPU_NO}, data: {ls_data}")

    with ThreadPool(N_THREAD) as threadpool:
        for _ in threadpool.imap_unordered(partial(process_threadpool, CPU_NO=CPU_NO), ls_data):
            pass


if __name__ == '__main__':
    # Just for CPU numbering
    CPU_QUEUE = Queue(N_CPU)
    for i in range(1, 1 + N_CPU):
        CPU_QUEUE.put(i)

    # given data
    ls_data = list(range(N))

    # split data to pools
    ls_ls_data = split_list_to_pools(ls_data, N_CPU)
    print(f"data rearranged for pool: {ls_ls_data}")

    # process in parallel
    with Pool(N_CPU, initializer=init_pool_processes, initargs=(CPU_QUEUE,)) as pool:
        for _ in pool.imap_unordered(process_pool, ls_ls_data):
            pass

    print("Program Ended!")