在 Python 中的进程之间共享许多队列

Sharing many queues among processes in Python

我知道 multiprocessing.Manager() 以及如何使用它来创建共享对象,特别是可以在工作人员之间共享的队列。有this question, this question, this question and even .

但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列由变量 key.

标识

我想在需要放置和获取数据时使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多东西。 multiprocessing 导入为 mp:

在多处理模块导入的配置文件中定义像 for key in all_keys: DICT[key] = mp.Queue 这样的字典(称为 multi.py)不会 return 错误,但队列 DICT[key] 在进程之间不共享,每个进程似乎都有自己的队列副本,因此没有通信发生。

如果我尝试在定义进程并启动它们的主要多处理函数的开头定义 DICT,例如

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()

我收到错误

RuntimeError: Queue objects should only be shared between processes through
 inheritance

改为

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()

只会让一切变得更糟。在 multi.py 的头部而不是在主函数内部尝试类似的定义 returns 类似的错误。

必须有一种方法可以在进程之间共享多个队列,而无需在代码中明确命名每个队列。有什么想法吗?

编辑

这是程序的基本架构:

1- 加载第一个模块,它定义了一些变量,导入 multi,启动 multi.main(),然后加载另一个模块,该模块开始级联模块加载和代码执行。同时...

2- multi.main 看起来像这样:

def main():
    manager = mp.Manager()
    pool = mp.Pool()
    DICT2 = manager.dict()

    for key in all_keys:
        DICT2[key] = manager.Queue()
        proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
        proc_2 =  pool.apply_async(targ2,(DICT2[key], otherargs,) 

我没有使用 poolmanager,而是使用以下启动进程:

mp.Process(target=targ1, args=(DICT[key],))

3 - 函数 targ1 从主进程获取输入数据(按 key 排序)。它旨在将结果传递给 DICT[key],以便 targ2 可以完成它的工作。这是不起作用的部分。有任意数量的 targ1s、targ2s 等,因此有任意数量的队列。

4 - 其中一些过程的结果将被发送到一堆不同的数组/pandas 数据帧,它们也由 key 索引,我希望可以从任意进程,甚至是在不同模块中启动的进程。我还没有写这部分,这可能是一个不同的问题。 (我在这里提到它是因为上面 3 的答案也可能很好地解决 4。)

听起来您的问题似乎是在您尝试通过将 multiprocessing.Queue() 作为参数传递来共享时开始的。您可以通过创建一个 managed queue 来解决这个问题:

import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()

当您使用管理器创建它时,您正在存储并传递一个 代理 到队列,而不是队列本身,所以即使您传递给对象你的工作进程是一个副本,它仍然会指向相同的底层数据结构:你的队列。它与 C/C++ 中的指针非常相似(在概念上)。如果您以这种方式创建队列,您将能够在启动工作进程时传递它们。

由于您现在可以传递队列,因此您不再需要管理字典。在 main 中保留一个普通字典,它将存储所有映射,并且只为您的工作进程提供他们需要的队列,因此他们不需要访问任何映射。

我在这里写了一个例子。看起来你正在你的工人之间传递对象,所以这就是这里所做的。假设我们有两个处理阶段,数据的开始和结束都在 main 的控制中。看看我们如何创建像管道一样连接工作人员的队列,但是通过给他们只有他们需要的队列,他们不需要知道任何映射:

import multiprocessing as mp

def stage1(q_in, q_out):

    q_out.put(q_in.get()+"Stage 1 did some work.\n")
    return

def stage2(q_in, q_out):

    q_out.put(q_in.get()+"Stage 2 did some work.\n")
    return

def main():

    pool = mp.Pool()
    manager = mp.Manager()

    # create managed queues
    q_main_to_s1 = manager.Queue()
    q_s1_to_s2 = manager.Queue()
    q_s2_to_main = manager.Queue()

    # launch workers, passing them the queues they need
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))

    # Send a message into the pipeline
    q_main_to_s1.put("Main started the job.\n")

    # Wait for work to complete
    print(q_s2_to_main.get()+"Main finished the job.")

    pool.close()
    pool.join()

    return

if __name__ == "__main__":
    main()

代码生成此输出:

Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.

我没有包含在字典中存储队列或 AsyncResults 对象的示例,因为我仍然不太了解您的程序应该如何工作。但是现在您可以自由地传递队列,您可以构建字典来根据需要存储 queue/process 映射。

事实上,如果您确实在多个工作线程之间构建管道,您甚至不需要保留对 main 中 "inter-worker" 队列的引用。创建队列,将它们传递给您的工作人员,然后仅保留对 main 将使用的队列的引用。如果您确实有 "an arbitrary number" 个队列,我肯定会建议您尽快对旧队列进行垃圾回收。