从多处理中使用池时如何在函数中获取进程号

How to get the process number in the function when using Pool from multiprocessing

当使用池进行多处理时,我试图在我的函数中获取当前进程号。这是我用来测试这个的代码:

from multiprocessing.dummy import Pool
import itertools

def function(a,b,c):
    print("Value of a: {} Value of b : {} Constant : {}".format(a,b,c))

a = [4,5,6,7,8]
b = [11,12,13,14,15]

pool = Pool(3)
pool.starmap(function, zip(a,b,itertools.repeat(50)))
pool.close()
pool.join()

现在我的函数输出如下所示:

Value of a: 4 Value of b : 11 Constant : 50
...

我真正想要的是在我的函数中也获取当前进程编号,以准确通知我哪个进程是 运行 函数的当前迭代 像这样:

Value of a: 4 Value of b : 11 Constant : 50 Process : 1
Value of a: 5 Value of b : 12 Constant : 50 Process : 2
Value of a: 6 Value of b : 13 Constant : 50 Process : 3

我尝试使用 multiprocessing.current_process().ident 但它显示了这个输出:

Value of a: 4 Value of b : 11 Constant : 50 Thread : 33084
Value of a: 5 Value of b : 12 Constant : 50 Thread : 33084
Value of a: 6 Value of b : 13 Constant : 50 Thread : 33084

我应该使用 multiprocessing 中的任何其他方法或属性来获取当前进程号吗?

您正在使用 multiprocessing.dummy.Pool,即 actually a thread pool,而不是进程池。因此,所有内容仍然 运行 在单个进程中,这意味着每个线程将具有与 multiprocesing.current_process() 相同的 ident 值。如果您打算使用线程池,则可以使用 threading.current_thread().ident 为每个线程获取唯一 ID。

如果您打算使用进程池,那么一旦切换,multiprocessing.current_process().ident 将按照您期望的方式工作。您也可以使用 os.getpid(),它(至少在 Linux 上)returns 具有相同的值。

如果您希望每个线程都有一个从 1 开始递增的单调递增 ID,您可以通过在每个线程启动时自行分配标识符来实现,如下所示:

from multiprocessing.dummy import Pool
import itertools

def function(a,b,c):
    print("Value of a: {} Value of b : {} Constant : {} ID: {}".format(a,b,c,d.id))

a = [4,5,6,7,8]
b = [11,12,13,14,15]

d = threading.local()
def set_num(counter):
    d.id = next(counter) + 1

pool = Pool(3, initializer=set_num, initargs=(itertools.count(),))

pool.starmap(function, zip(a,b,itertools.repeat(50)))
pool.close()
pool.join()

itertools.count()是线程安全的,所以可以用来在池中的每个线程初始化的时候分配一个唯一的标识符。然后,您可以使用 threading.local 对象来存储每个线程的唯一 ID。

如果您不关心实际有一个整数值,您可以只使用 threading.current_thread().name,它将打印一个具有整数后缀的字符串,从 1 开始计数。