如何并行 运行 不同的方法并获取输出以进行进一步处理?

How to run different methods in parallel and obtain the outputs for further processing?

我想了解如何运行 parelle 中的两个核心密集函数。这些函数需要相同的输入并产生不同的输出。

上次使用多处理库时,我将结果保存到文件中,不需要进一步处理。

下面是一个简单的代码类型示例。如何并行化函数,以便输出 a 和 b 都可以用于进一步处理? 我正在使用 Python 2.7.

input_dict = {'key1':'value1','key2':'value2','key3':'value3'}

def func1(dictionary):
    # do some work
    return np.array()

def func2(dictionary):
    # do different work
    return np.array()

a = func1(input_dict)
b = func2(input_dict)

result = np.dot(a, b)

下面的代码是 运行 两个函数一起运行并收集它们的输出的正确方法吗?

from multiprocessing import Process, Queue

input_dict = {'key1':'value1','key2':'value2','key3':'value3'}

def func1(dictionary):
    # do some work
    return q.put(np.array())

def func2(dictionary):
    # do different work
    return q.put(np.array())


if __name__ == '__main__':
    q1 = Queue()
    q2 = Queue()
    p1 = Process(target=func1, args=(input_dict,))
    p2 = Process(target=func2, args=(input_dict,))
    p1.start()
    p2.start()

    a = q1.get()
    b = q2.get()
    p1.join()
    p2.join()
    result = np.dot(a, b)

大部分是正确的,但是在初始化进程时必须通过 q1q2

此外,将 return q.put(np.array()) 替换为 q.put(np.array())

当得到像 a = q1.get() 这样的结果队列时,请记住你只是从队列中得到一个项目,而不是一个列表。

如果你需要提取列表,你可以这样做:

from multiprocessing import Process, Queue
import numpy as np

input_dict = {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}


def func1(dictionary, q):
    # do some work
    for x in xrange(5):
        q.put(
            np.random.randint(2, size=20)
        )
    q.put('STOP')


def func2(dictionary, q):
    # do different work
    q.put(
        np.array([1, 1, 1, 1, 0, 1, 1, 0, 1, 0,
                  1, 0, 1, 0, 1, 0, 0, 1, 1, 1])
    )


if __name__ == '__main__':
    q1 = Queue()
    q2 = Queue()
    p1 = Process(target=func1, args=(input_dict, q1))
    p2 = Process(target=func2, args=(input_dict, q2))
    p1.start()
    p2.start()

    f1_results = []
    # Keep adding items from func1 to the results list, until it sees a 'STOP' item
    for array in iter(q1.get, 'STOP'):
        f1_results.append(array)

    b = q2.get()
    p1.join()
    p2.join()

    for a in f1_results:
        result = np.dot(a, b)
        print result