在 Python 中的进程之间共享许多队列
Sharing many queues among processes in Python
我知道 multiprocessing.Manager()
以及如何使用它来创建共享对象,特别是可以在工作人员之间共享的队列。有this question, this question, this question and even .
但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列由变量 key
.
标识
我想在需要放置和获取数据时使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多东西。 multiprocessing
导入为 mp
:
在多处理模块导入的配置文件中定义像 for key in all_keys: DICT[key] = mp.Queue
这样的字典(称为 multi.py
)不会 return 错误,但队列 DICT[key]
在进程之间不共享,每个进程似乎都有自己的队列副本,因此没有通信发生。
如果我尝试在定义进程并启动它们的主要多处理函数的开头定义 DICT
,例如
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
我收到错误
RuntimeError: Queue objects should only be shared between processes through
inheritance
改为
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
只会让一切变得更糟。在 multi.py
的头部而不是在主函数内部尝试类似的定义 returns 类似的错误。
必须有一种方法可以在进程之间共享多个队列,而无需在代码中明确命名每个队列。有什么想法吗?
编辑
这是程序的基本架构:
1- 加载第一个模块,它定义了一些变量,导入 multi
,启动 multi.main()
,然后加载另一个模块,该模块开始级联模块加载和代码执行。同时...
2- multi.main
看起来像这样:
def main():
manager = mp.Manager()
pool = mp.Pool()
DICT2 = manager.dict()
for key in all_keys:
DICT2[key] = manager.Queue()
proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,)
我没有使用 pool
和 manager
,而是使用以下启动进程:
mp.Process(target=targ1, args=(DICT[key],))
3 - 函数 targ1
从主进程获取输入数据(按 key
排序)。它旨在将结果传递给 DICT[key]
,以便 targ2
可以完成它的工作。这是不起作用的部分。有任意数量的 targ1
s、targ2
s 等,因此有任意数量的队列。
4 - 其中一些过程的结果将被发送到一堆不同的数组/pandas 数据帧,它们也由 key
索引,我希望可以从任意进程,甚至是在不同模块中启动的进程。我还没有写这部分,这可能是一个不同的问题。 (我在这里提到它是因为上面 3 的答案也可能很好地解决 4。)
听起来您的问题似乎是在您尝试通过将 multiprocessing.Queue()
作为参数传递来共享时开始的。您可以通过创建一个 managed queue 来解决这个问题:
import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()
当您使用管理器创建它时,您正在存储并传递一个 代理 到队列,而不是队列本身,所以即使您传递给对象你的工作进程是一个副本,它仍然会指向相同的底层数据结构:你的队列。它与 C/C++ 中的指针非常相似(在概念上)。如果您以这种方式创建队列,您将能够在启动工作进程时传递它们。
由于您现在可以传递队列,因此您不再需要管理字典。在 main 中保留一个普通字典,它将存储所有映射,并且只为您的工作进程提供他们需要的队列,因此他们不需要访问任何映射。
我在这里写了一个例子。看起来你正在你的工人之间传递对象,所以这就是这里所做的。假设我们有两个处理阶段,数据的开始和结束都在 main
的控制中。看看我们如何创建像管道一样连接工作人员的队列,但是通过给他们只有他们需要的队列,他们不需要知道任何映射:
import multiprocessing as mp
def stage1(q_in, q_out):
q_out.put(q_in.get()+"Stage 1 did some work.\n")
return
def stage2(q_in, q_out):
q_out.put(q_in.get()+"Stage 2 did some work.\n")
return
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
代码生成此输出:
Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.
我没有包含在字典中存储队列或 AsyncResults
对象的示例,因为我仍然不太了解您的程序应该如何工作。但是现在您可以自由地传递队列,您可以构建字典来根据需要存储 queue/process 映射。
事实上,如果您确实在多个工作线程之间构建管道,您甚至不需要保留对 main
中 "inter-worker" 队列的引用。创建队列,将它们传递给您的工作人员,然后仅保留对 main
将使用的队列的引用。如果您确实有 "an arbitrary number" 个队列,我肯定会建议您尽快对旧队列进行垃圾回收。
我知道 multiprocessing.Manager()
以及如何使用它来创建共享对象,特别是可以在工作人员之间共享的队列。有this question, this question, this question and even
但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列由变量 key
.
我想在需要放置和获取数据时使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多东西。 multiprocessing
导入为 mp
:
在多处理模块导入的配置文件中定义像 for key in all_keys: DICT[key] = mp.Queue
这样的字典(称为 multi.py
)不会 return 错误,但队列 DICT[key]
在进程之间不共享,每个进程似乎都有自己的队列副本,因此没有通信发生。
如果我尝试在定义进程并启动它们的主要多处理函数的开头定义 DICT
,例如
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
我收到错误
RuntimeError: Queue objects should only be shared between processes through
inheritance
改为
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
只会让一切变得更糟。在 multi.py
的头部而不是在主函数内部尝试类似的定义 returns 类似的错误。
必须有一种方法可以在进程之间共享多个队列,而无需在代码中明确命名每个队列。有什么想法吗?
编辑
这是程序的基本架构:
1- 加载第一个模块,它定义了一些变量,导入 multi
,启动 multi.main()
,然后加载另一个模块,该模块开始级联模块加载和代码执行。同时...
2- multi.main
看起来像这样:
def main():
manager = mp.Manager()
pool = mp.Pool()
DICT2 = manager.dict()
for key in all_keys:
DICT2[key] = manager.Queue()
proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,)
我没有使用 pool
和 manager
,而是使用以下启动进程:
mp.Process(target=targ1, args=(DICT[key],))
3 - 函数 targ1
从主进程获取输入数据(按 key
排序)。它旨在将结果传递给 DICT[key]
,以便 targ2
可以完成它的工作。这是不起作用的部分。有任意数量的 targ1
s、targ2
s 等,因此有任意数量的队列。
4 - 其中一些过程的结果将被发送到一堆不同的数组/pandas 数据帧,它们也由 key
索引,我希望可以从任意进程,甚至是在不同模块中启动的进程。我还没有写这部分,这可能是一个不同的问题。 (我在这里提到它是因为上面 3 的答案也可能很好地解决 4。)
听起来您的问题似乎是在您尝试通过将 multiprocessing.Queue()
作为参数传递来共享时开始的。您可以通过创建一个 managed queue 来解决这个问题:
import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()
当您使用管理器创建它时,您正在存储并传递一个 代理 到队列,而不是队列本身,所以即使您传递给对象你的工作进程是一个副本,它仍然会指向相同的底层数据结构:你的队列。它与 C/C++ 中的指针非常相似(在概念上)。如果您以这种方式创建队列,您将能够在启动工作进程时传递它们。
由于您现在可以传递队列,因此您不再需要管理字典。在 main 中保留一个普通字典,它将存储所有映射,并且只为您的工作进程提供他们需要的队列,因此他们不需要访问任何映射。
我在这里写了一个例子。看起来你正在你的工人之间传递对象,所以这就是这里所做的。假设我们有两个处理阶段,数据的开始和结束都在 main
的控制中。看看我们如何创建像管道一样连接工作人员的队列,但是通过给他们只有他们需要的队列,他们不需要知道任何映射:
import multiprocessing as mp
def stage1(q_in, q_out):
q_out.put(q_in.get()+"Stage 1 did some work.\n")
return
def stage2(q_in, q_out):
q_out.put(q_in.get()+"Stage 2 did some work.\n")
return
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
代码生成此输出:
Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.
我没有包含在字典中存储队列或 AsyncResults
对象的示例,因为我仍然不太了解您的程序应该如何工作。但是现在您可以自由地传递队列,您可以构建字典来根据需要存储 queue/process 映射。
事实上,如果您确实在多个工作线程之间构建管道,您甚至不需要保留对 main
中 "inter-worker" 队列的引用。创建队列,将它们传递给您的工作人员,然后仅保留对 main
将使用的队列的引用。如果您确实有 "an arbitrary number" 个队列,我肯定会建议您尽快对旧队列进行垃圾回收。