多处理的动态参数

Dynamic argument for multiprocessing

我有以下问题:我有一个包含几百个键(仍然大约 150 MB)的字典,每个键都有一个包含字典、列表和单个值的复杂值。我有 3 个传入信息流,分别为 1 秒、0.1 秒和实时计时,具体取决于数据类型。为了加速数据处理,我想使用 multiprocessing 为不同的源创建 3 个进程,最好每个进程都有自己的池以进一步加速。

问题是如何将通用字典“砍”成可更新的片段。在我看来,使用池或进程时,我必须在初始化 process/pool 时一开始就决定参数列表。我的任务需要这样的东西:我收到一条消息,指出需要更新“A”键。我分配了一个工作人员来更新它,传递包含新信息和“A”的复杂对象(或至少是“A”的相关值)的消息。我绝对不想将整个字典传递给每个工作人员,因为它会占用大量内存。

在这个示例代码中,我只想传递 general_dict['A']['a']example_data_a被处理,general_dict['B']['a']为第三个和很快。 example_data_b也是如此。我应该如何传递参数?

general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                'B': {'a': [3, 4, 5], 'b': 'test2'},
                'C': {'a': [6, 7, 8], 'b': 'test3'}},

example_data_a = ['A', [2,1,2],
                  'A', [2,3,2],
                  'B', [3,0,5],
                  'C', [6,1,8]]

example_data_b = ['A', 'test11',
                  'B', 'test21',
                  'B', 'test22',
                  'C', 'test31']

def update_a(x):
    ...

def update_b(y):
    ...

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = update_a)
    p2 = multiprocessing.Process(target = update_b)
    p1.start()
    p2.start()
    p1.join()
    p2.join()

我明白你的意思了。但问题是所有可能的密钥都可能通过所有三个流,所以这听起来不像是最可行的方法。在我看来,您应该有一个进程来处理输入流。而且,应该不需要拆分字典。相反,您拥有三个进程来处理您所设想的三分之一的密钥。每个进程都从一开始就启动,并通过它们自己的 multiprocessing.Queue 实例作为输入队列,它们都通过一个公共结果队列传递回 return 值。由主进程启动的线程不断进入结果队列并使用 returned 值更新字典。

这是总体思路:

from multiprocessing import Process, Queue
from threading import Thread


def update_a(input_queue, result_queue):
    while True:
        # Wait for next request:
        x = input_queue.get()
        if x is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)

def update_b(input_queue, result_queue):
    while True:
        # Wait for next request:
        y = input_queue.get()
        if y is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)

def update_c(input_queue, result_queue):
    while True:
        # Wait for next request:
        z = input_queue.get()
        if x is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)


def process_results():
    sentinels_seen = 0
    # Have all 3 processes finished?
    while sentinels_seen < 3:
        # Get next result
        result = result_queue.get()
        if result is None:
            # Sentinel
            sentinels_seen += 1
        else:
            # Update general_dict with result:
            ...

def process_input_stream():
    while True:
        # When we have decided that we are through processing input
        # break out of the loop:
        if through_processing:
            break
        # Get input from one of 3 sources and depending on key
        # put the "argument" to either a_q, b_q or c_q to be handled respectively
        # by either update_a, update_b or update_c.
        # The result will be put to result queue which will be processed by our
        # process_results thread.
        ...

    # Add a sentinel to each of the input queues:
    a_q.put(None)
    b_q.put(None)
    c_q.put(None)

if __name__ == "__main__":
    # Building the general_dict should be protected by if __name__ == "__main__":
    general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                    'B': {'a': [3, 4, 5], 'b': 'test2'},
                    'C': {'a': [6, 7, 8], 'b': 'test3'}}
    a_q, b_q, c_q =  Queue(), Queue(), Queue()
    result_queue = Queue()
    p1 = Process(target=update_a, args=(a_q, result_queue))
    p2 = Process(target=update_b, args=(b_q, result_queue))
    p3 = Process(target=update_c, args=(c_q, result_queue))
    t = Thread(target=process_results)
    p1.start()
    p2.start()
    p3.start()

    process_input_stream()

    p1.join()
    p2.join()
    p3.join()
    t.join()

注:

如果你发现 process_results 线程和 process_input_stream 循环之间存在太多争用,因为 GIL 阻止了后者跟上输入流,那么不要启动和加入 process_results 个话题。相反,只需像以前一样启动并加入三个进程,然后最终 call process_results 作为主进程的函数。当然,您将以这种方式失去任何并发性:

if __name__ == "__main__":
    # Building the general_dict should be protected by if __name__ == "__main__":
    general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                    'B': {'a': [3, 4, 5], 'b': 'test2'},
                    'C': {'a': [6, 7, 8], 'b': 'test3'}}
    a_q, b_q, c_q =  Queue(), Queue(), Queue()
    result_queue = Queue()
    p1 = Process(target=update_a, args=(a_q, result_queue))
    p2 = Process(target=update_b, args=(b_q, result_queue))
    p3 = Process(target=update_c, args=(c_q, result_queue))
    p1.start()
    p2.start()
    p3.start()

    process_input_stream()

    p1.join()
    p2.join()
    p3.join()

    process_results()