多处理的动态参数
Dynamic argument for multiprocessing
我有以下问题:我有一个包含几百个键(仍然大约 150 MB)的字典,每个键都有一个包含字典、列表和单个值的复杂值。我有 3 个传入信息流,分别为 1 秒、0.1 秒和实时计时,具体取决于数据类型。为了加速数据处理,我想使用 multiprocessing 为不同的源创建 3 个进程,最好每个进程都有自己的池以进一步加速。
问题是如何将通用字典“砍”成可更新的片段。在我看来,使用池或进程时,我必须在初始化 process/pool 时一开始就决定参数列表。我的任务需要这样的东西:我收到一条消息,指出需要更新“A”键。我分配了一个工作人员来更新它,传递包含新信息和“A”的复杂对象(或至少是“A”的相关值)的消息。我绝对不想将整个字典传递给每个工作人员,因为它会占用大量内存。
在这个示例代码中,我只想传递 general_dict['A']['a'] 当 example_data_a被处理,general_dict['B']['a']为第三个和很快。 example_data_b也是如此。我应该如何传递参数?
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}},
example_data_a = ['A', [2,1,2],
'A', [2,3,2],
'B', [3,0,5],
'C', [6,1,8]]
example_data_b = ['A', 'test11',
'B', 'test21',
'B', 'test22',
'C', 'test31']
def update_a(x):
...
def update_b(y):
...
if __name__ == "__main__":
p1 = multiprocessing.Process(target = update_a)
p2 = multiprocessing.Process(target = update_b)
p1.start()
p2.start()
p1.join()
p2.join()
我明白你的意思了。但问题是所有可能的密钥都可能通过所有三个流,所以这听起来不像是最可行的方法。在我看来,您应该有一个进程来处理输入流。而且,应该不需要拆分字典。相反,您拥有三个进程来处理您所设想的三分之一的密钥。每个进程都从一开始就启动,并通过它们自己的 multiprocessing.Queue
实例作为输入队列,它们都通过一个公共结果队列传递回 return 值。由主进程启动的线程不断进入结果队列并使用 returned 值更新字典。
这是总体思路:
from multiprocessing import Process, Queue
from threading import Thread
def update_a(input_queue, result_queue):
while True:
# Wait for next request:
x = input_queue.get()
if x is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def update_b(input_queue, result_queue):
while True:
# Wait for next request:
y = input_queue.get()
if y is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def update_c(input_queue, result_queue):
while True:
# Wait for next request:
z = input_queue.get()
if x is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def process_results():
sentinels_seen = 0
# Have all 3 processes finished?
while sentinels_seen < 3:
# Get next result
result = result_queue.get()
if result is None:
# Sentinel
sentinels_seen += 1
else:
# Update general_dict with result:
...
def process_input_stream():
while True:
# When we have decided that we are through processing input
# break out of the loop:
if through_processing:
break
# Get input from one of 3 sources and depending on key
# put the "argument" to either a_q, b_q or c_q to be handled respectively
# by either update_a, update_b or update_c.
# The result will be put to result queue which will be processed by our
# process_results thread.
...
# Add a sentinel to each of the input queues:
a_q.put(None)
b_q.put(None)
c_q.put(None)
if __name__ == "__main__":
# Building the general_dict should be protected by if __name__ == "__main__":
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}}
a_q, b_q, c_q = Queue(), Queue(), Queue()
result_queue = Queue()
p1 = Process(target=update_a, args=(a_q, result_queue))
p2 = Process(target=update_b, args=(b_q, result_queue))
p3 = Process(target=update_c, args=(c_q, result_queue))
t = Thread(target=process_results)
p1.start()
p2.start()
p3.start()
process_input_stream()
p1.join()
p2.join()
p3.join()
t.join()
注:
如果你发现 process_results
线程和 process_input_stream
循环之间存在太多争用,因为 GIL 阻止了后者跟上输入流,那么不要启动和加入 process_results
个话题。相反,只需像以前一样启动并加入三个进程,然后最终 call process_results
作为主进程的函数。当然,您将以这种方式失去任何并发性:
if __name__ == "__main__":
# Building the general_dict should be protected by if __name__ == "__main__":
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}}
a_q, b_q, c_q = Queue(), Queue(), Queue()
result_queue = Queue()
p1 = Process(target=update_a, args=(a_q, result_queue))
p2 = Process(target=update_b, args=(b_q, result_queue))
p3 = Process(target=update_c, args=(c_q, result_queue))
p1.start()
p2.start()
p3.start()
process_input_stream()
p1.join()
p2.join()
p3.join()
process_results()
我有以下问题:我有一个包含几百个键(仍然大约 150 MB)的字典,每个键都有一个包含字典、列表和单个值的复杂值。我有 3 个传入信息流,分别为 1 秒、0.1 秒和实时计时,具体取决于数据类型。为了加速数据处理,我想使用 multiprocessing 为不同的源创建 3 个进程,最好每个进程都有自己的池以进一步加速。
问题是如何将通用字典“砍”成可更新的片段。在我看来,使用池或进程时,我必须在初始化 process/pool 时一开始就决定参数列表。我的任务需要这样的东西:我收到一条消息,指出需要更新“A”键。我分配了一个工作人员来更新它,传递包含新信息和“A”的复杂对象(或至少是“A”的相关值)的消息。我绝对不想将整个字典传递给每个工作人员,因为它会占用大量内存。
在这个示例代码中,我只想传递 general_dict['A']['a'] 当 example_data_a被处理,general_dict['B']['a']为第三个和很快。 example_data_b也是如此。我应该如何传递参数?
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}},
example_data_a = ['A', [2,1,2],
'A', [2,3,2],
'B', [3,0,5],
'C', [6,1,8]]
example_data_b = ['A', 'test11',
'B', 'test21',
'B', 'test22',
'C', 'test31']
def update_a(x):
...
def update_b(y):
...
if __name__ == "__main__":
p1 = multiprocessing.Process(target = update_a)
p2 = multiprocessing.Process(target = update_b)
p1.start()
p2.start()
p1.join()
p2.join()
我明白你的意思了。但问题是所有可能的密钥都可能通过所有三个流,所以这听起来不像是最可行的方法。在我看来,您应该有一个进程来处理输入流。而且,应该不需要拆分字典。相反,您拥有三个进程来处理您所设想的三分之一的密钥。每个进程都从一开始就启动,并通过它们自己的 multiprocessing.Queue
实例作为输入队列,它们都通过一个公共结果队列传递回 return 值。由主进程启动的线程不断进入结果队列并使用 returned 值更新字典。
这是总体思路:
from multiprocessing import Process, Queue
from threading import Thread
def update_a(input_queue, result_queue):
while True:
# Wait for next request:
x = input_queue.get()
if x is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def update_b(input_queue, result_queue):
while True:
# Wait for next request:
y = input_queue.get()
if y is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def update_c(input_queue, result_queue):
while True:
# Wait for next request:
z = input_queue.get()
if x is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def process_results():
sentinels_seen = 0
# Have all 3 processes finished?
while sentinels_seen < 3:
# Get next result
result = result_queue.get()
if result is None:
# Sentinel
sentinels_seen += 1
else:
# Update general_dict with result:
...
def process_input_stream():
while True:
# When we have decided that we are through processing input
# break out of the loop:
if through_processing:
break
# Get input from one of 3 sources and depending on key
# put the "argument" to either a_q, b_q or c_q to be handled respectively
# by either update_a, update_b or update_c.
# The result will be put to result queue which will be processed by our
# process_results thread.
...
# Add a sentinel to each of the input queues:
a_q.put(None)
b_q.put(None)
c_q.put(None)
if __name__ == "__main__":
# Building the general_dict should be protected by if __name__ == "__main__":
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}}
a_q, b_q, c_q = Queue(), Queue(), Queue()
result_queue = Queue()
p1 = Process(target=update_a, args=(a_q, result_queue))
p2 = Process(target=update_b, args=(b_q, result_queue))
p3 = Process(target=update_c, args=(c_q, result_queue))
t = Thread(target=process_results)
p1.start()
p2.start()
p3.start()
process_input_stream()
p1.join()
p2.join()
p3.join()
t.join()
注:
如果你发现 process_results
线程和 process_input_stream
循环之间存在太多争用,因为 GIL 阻止了后者跟上输入流,那么不要启动和加入 process_results
个话题。相反,只需像以前一样启动并加入三个进程,然后最终 call process_results
作为主进程的函数。当然,您将以这种方式失去任何并发性:
if __name__ == "__main__":
# Building the general_dict should be protected by if __name__ == "__main__":
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}}
a_q, b_q, c_q = Queue(), Queue(), Queue()
result_queue = Queue()
p1 = Process(target=update_a, args=(a_q, result_queue))
p2 = Process(target=update_b, args=(b_q, result_queue))
p3 = Process(target=update_c, args=(c_q, result_queue))
p1.start()
p2.start()
p3.start()
process_input_stream()
p1.join()
p2.join()
p3.join()
process_results()