Python:如何在多处理情况下 return 值?

Python: How to return values in a multiprocessing situation?

假设我收集了 Process-es,a[0]a[m]

然后这些进程将通过队列将作业发送到 Process-es 的另一个集合,b[0]b[n],其中 m > n

或者,如图所示:

a[0], a[1], ..., a[m] ---Queue---> b[0], b[1], ..., b[n]

现在,我如何return b 进程的结果到相关的 a 进程?

我的第一个猜测是使用 multiprocessing.Pipe()

因此,我尝试执行以下操作:

## On the 'a' side
pipe = multiprocessing.Pipe()
job['pipe'] = pipe
queue.put(job)
rslt = pipe[0].recv()

## On the 'b' side
job = queue.get()
... process the job ...
pipe = job['pipe']
pipe.send(result)

并且它不适用于错误:Required argument 'handle' (pos 1) not found

阅读了很多文档,我想到了:

## On the 'a' side
pipe = multiprocessing.Pipe()
job['pipe'] = multiprocessing.reduction.reduce_connection(pipe[1])
queue.put(job)
rslt = pipe[0].recv()

## On the 'b' side
job = queue.get()
... process the job ...
pipe = multiprocessing.reduction.rebuild_connection(job['pipe'], True, True)
pipe.send(result)

现在我得到一个不同的错误:ValueError: need more than 2 values to unpack

我找了又找,还是找不到如何正确使用 reduce_rebuild_ 方法。

请帮助我 return 从 ba 的值。

我建议避免使用这种管道和文件描述符的移动(我上次尝试时,它不是很标准,也没有很好的记录)。不得不处理它是一种痛苦,我不推荐它:-/

我建议采用不同的方法:让 main 管理连接。保留一个工作队列,但以不同的路径发送响应。这意味着您需要某种线程标识符。我将提供一个玩具实现来说明我的建议:

#!/usr/bin/env python

import multiprocessing
import random

def fib(n):
    "Slow fibonacci implementation because why not"
    if n < 2:
        return n
    return fib(n-2) + fib(n-1)


def process_b(queue_in, queue_out):
    print "Starting process B"
    while True:
        j = queue_in.get()
        print "Job: %d" % j["val"]
        j["result"] = fib(j["val"])
        queue_out.put(j)


def process_a(index, pipe_end, queue):
    print "Starting process A"
    value = random.randint(5, 50)
    j = {
        "a_id": index,
        "val": value,
    }

    queue.put(j)
    r = pipe_end.recv()

    print "Process A sent value %d and received: %s" % (value, r)


def main():
    print "Starting main"

    a_pipes = list()
    jobs = multiprocessing.Queue()
    done_jobs = multiprocessing.Queue()

    for i in range(5):
        multiprocessing.Process(target=process_b, args=(jobs, done_jobs,)).start()

    for i in range(10):
        receiver, sender = multiprocessing.Pipe(duplex=False)
        a_pipes.append(sender)
        multiprocessing.Process(target=process_a, args=(i, receiver, jobs)).start()

    while True:
        j = done_jobs.get()
        a_pipes[j["a_id"]].send(j["result"])

if __name__ == "__main__":
    main()

请注意,作业队列直接连接在 ab 进程之间。 a 进程负责放置它们的标识符("master" 应该知道)。 b 使用不同的队列来完成工作。我使用了相同的工作字典,但典型的实现应该使用一些更定制的数据结构。此响应应具有 a 的标识符,以便 master 将其发送到特定进程。

我假设有一些方法可以将它与您的方法一起使用,我一点也不讨厌(这本来是我的第一种方法)。但是必须处理文件描述符以及 reduce_rebuild_ 方法并不好。完全没有。

因此,正如@MariusSiuram 在 中解释的那样,尝试传递 Connection 对象是一种挫败感。

我最终求助于使用 DictProxy 到 return 从 BA 的值。

这是概念:

### This is in the main process
...
jobs_queue = multiprocessing.Queue()
manager = multiprocessing.Manager()
ret_dict = manager.dict()
...
# Somewhere during Process initialization, jobs_queue and ret_dict got passed to
# the workers' constructor
...

### This is in the "A" (left-side) workers
...
self.ret_dict.pop(self.pid, None)  # Remove our identifier if exist
self.jobs_queue.put({
    'request': parameters_to_be_used_by_B,
    'requester': self.pid
})
while self.pid not in self.ret_dict:
    time.sleep(0.1)  # Or any sane value
result = self.ret_dict[self.pid]    
...

### This is in the "B" (right-side) workers
...
while True:
    job = self.jobs_queue.get()
    if job is None:
        break
    result = self.do_something(job['request'])
    self.ret_dict[job['requester']] = result
...