通过多进程生成的对象标识

Object identity through multiprocess spawn

我一直在试验 python 中的多处理模块,我想知道如何通过生成的进程处理不同并行化方法的参数。这是我使用的代码:

import os
import time
import multiprocessing


class StateClass:
    def __init__(self):
        self.state = 0

    def __call__(self):
        return f"I am {id(self)}: {self.state}"


CONTEXT = multiprocessing.get_context("fork")

nb_workers = 2

stato = StateClass()


def wrapped_work_function(a1, a2, sss, qqq):
    time.sleep(a1 + 1)
    if a1 == 0:
        sss.state = 0
    else:
        sss.state = 123
    for eee in a2:
        time.sleep(a1 + 1)
        sss.state += eee
        print(
            f"Worker {a1} in process {os.getpid()} (parent process {os.getppid()}): {eee}, {sss()}"
        )
    return sss


print("main", id(stato), stato)

manager = CONTEXT.Manager()
master_workers_queue = manager.Queue()

work_args_list = [
    (
        worker_index,
        [iii for iii in range(4)],
        stato,
        master_workers_queue,
    )
    for worker_index in range(nb_workers)
]

pool = CONTEXT.Pool(nb_workers)
result = pool.starmap_async(wrapped_work_function, work_args_list)

pool.close()
pool.join()
print("Finish")
bullo = result.get(timeout=100)
bullo.append(stato)
for sss in bullo:
    print(sss, id(sss), sss.state)

我从中得到例如以下输出:

main 140349939506416 <__main__.StateClass object at 0x7fa5c449dcf0>
Worker 0 in process 9075 (parent process 9047): 0, I am 140350069832528: 0
Worker 0 in process 9075 (parent process 9047): 1, I am 140350069832528: 1
Worker 1 in process 9077 (parent process 9047): 0, I am 140350069832528: 123
Worker 0 in process 9075 (parent process 9047): 2, I am 140350069832528: 3
Worker 0 in process 9075 (parent process 9047): 3, I am 140350069832528: 6
Worker 1 in process 9077 (parent process 9047): 1, I am 140350069832528: 124
Worker 1 in process 9077 (parent process 9047): 2, I am 140350069832528: 126
Worker 1 in process 9077 (parent process 9047): 3, I am 140350069832528: 129
Finish
<__main__.StateClass object at 0x7fa5c43ac190> 140349938516368 6
<__main__.StateClass object at 0x7fa5c43ac4c0> 140349938517184 129
<__main__.StateClass object at 0x7fa5c449dcf0> 140349939506416 0

初始 class 实例 stato 的 ID 为 140349939506416,并如我所料在其整个生命周期内保持不变。在 starmap_async 方法中,我确实得到了相同 class 的两个不同实例(每个 worker/process 一个),我可以修改它们并保留它们的 state 属性 直到脚本结束。无论如何,这些实例的 id 最初是相同的 (140350069832528),并且在脚本的末尾它们都有另一个 id,这也与原始实例的 id 不同。 具有相同的 id 并不意味着它们在内存中具有相同的地址?那么他们怎么可能保留不同的state? 此行为与分叉上下文有关吗?

首先,当我 运行 这个(Debian Linux、Python 3.9.7)时,我没有发现 sss 实例的 ID 是两个子流程相同:

main 140614771273680 <__main__.StateClass object at 0x7fe36d7defd0>
Worker 0 in process 19 (parent process 13): 0, I am 140614770671776: 0
Worker 0 in process 19 (parent process 13): 1, I am 140614770671776: 1
Worker 1 in process 20 (parent process 13): 0, I am 140614761373648: 123
Worker 0 in process 19 (parent process 13): 2, I am 140614770671776: 3
Worker 0 in process 19 (parent process 13): 3, I am 140614770671776: 6
Worker 1 in process 20 (parent process 13): 1, I am 140614761373648: 124
Worker 1 in process 20 (parent process 13): 2, I am 140614761373648: 126
Worker 1 in process 20 (parent process 13): 3, I am 140614761373648: 129
Finish
<__main__.StateClass object at 0x7fe36ce7b7f0> 140614761428976 6
<__main__.StateClass object at 0x7fe36ce7b520> 140614761428256 129
<__main__.StateClass object at 0x7fe36d7defd0> 140614771273680 0

即使您正在分叉新进程,work_args_list 列表中的 stato 个实例也会作为 sss 传递给您的辅助函数。传递给池工作函数的参数,运行ning 在不同的 process/address space 中,由 pickle 完成,它序列化然后 de-serializes 实例因此制作一个副本通常在它获得de-serialized时具有不同的ID。在这种特殊情况下,每个进程在使用 fork 方法时都继承了全局变量 stato,并且在所有 processes/address space 中应该具有相同的 id。如果我们修改 wrapped_work_function 以打印出 stato 的 id,我们可以验证这一点:

def wrapped_work_function(a1, a2, sss, qqq):
    print('The id of the inherited stato is', id(stato))
    time.sleep(a1 + 1)
    if a1 == 0:
        sss.state = 0
    else:
        sss.state = 123
    for eee in a2:
        time.sleep(a1 + 1)
        sss.state += eee
        print(
            f"Worker {a1} in process {os.getpid()} (parent process {os.getppid()}): {eee}, {sss()}"
        )
    return sss

则打印输出为:

main 140456701534160 <__main__.StateClass object at 0x7fbe9fcd1fd0>
The id of the inherited stato is 140456701534160
The id of the inherited stato is 140456701534160
Worker 0 in process 43 (parent process 37): 0, I am 140456700920112: 0
Worker 0 in process 43 (parent process 37): 1, I am 140456700920112: 1
Worker 1 in process 44 (parent process 37): 0, I am 140456700920112: 123
Worker 0 in process 43 (parent process 37): 2, I am 140456700920112: 3
Worker 0 in process 43 (parent process 37): 3, I am 140456700920112: 6
Worker 1 in process 44 (parent process 37): 1, I am 140456700920112: 124
Worker 1 in process 44 (parent process 37): 2, I am 140456700920112: 126
Worker 1 in process 44 (parent process 37): 3, I am 140456700920112: 129
Finish
<__main__.StateClass object at 0x7fbe9f36e880> 140456691689600 6
<__main__.StateClass object at 0x7fbe9f36eb20> 140456691690272 129
<__main__.StateClass object at 0x7fbe9fcd1fd0> 140456701534160 0

所有地址 space 都看到 stato 的相同 ID,即 140456701534160。如果每个地址 space 看到继承的 stato 的相同 ID,则 ID对于 sss,它应该是 stato, 的单独副本,不能与 stato 具有相同的 ID。当我 运行 代码时,它们的 ID 与我预期的不同。但是每个 sss 运行ning 在不同的地址 space 中可以有相同的 id,但这不能保证(在这一秒 运行 它们是相同的) .

但是即使sss个实例具有相同的id和相同的地址,这两个实例存在于两个不同的进程中,因此是两个不同的地址space。这就是为什么它们可以保持不同的状态。 顺便说一句,当你的 worker 函数 returns sss 时,它被传递回主进程使用 pickle,它序列化并 de-serializes 实例,因此实际上复制了原始实例。这就是您看到返回的 ID 不同的原因。

另外:您有 bullo = result.get(timeout=100) 测试可能的超时。但是这个语句之前调用了 pool.close()pool.join()。这两个调用将等待所有提交的任务完成。因此,当您调用 result.get 时,任务保证已完成并且永远不会导致超时异常。