如何将 *args 移交给 pathos 的 amap

how to handover *args to pathos' amap

我有以下问题:我想使用来自 pathos.multiprocessing 的地图。

import pathos as pt

class Foo:

    def __init__(self):
        pass

    def f(self, a, b, c):
        return a + b + c

    def g(self):  
        pool = pt.multiprocessing.ProcessPool()
        def _h(a, b, c):
            k = self.f(a, b, c)
            return k/2
        result = [pool.amap(_h, (i, j, k)) for i in range(3) for j in range(5) for k in range(7)]
        return result

a = Foo()
b = a.g()
b[0].get()

f和g这两个函数是很有必要的,虽然我可以在f中做任何事情。

如果我 运行 这段代码我得到 g 需要 3 个参数但给出了一个。

TypeError: _h() takes exactly 3 arguments (1 given)

我该如何解决这个问题?

我知道这看起来很奇怪,但我喜欢在悲情中将数据作为可迭代传递的想法。

要实现您正在寻找的内容,您必须将元素作为单独的可迭代项传递。这是一个例子:

def g(self):  
    pool = pt.multiprocessing.ProcessPool()
    def _h(a, b, c):
        k = self.f(a, b, c)
        return k/2
    result = [pool.amap(_h, (i,),( j,),(k,))) for i in range(3) for j in range(5) for k in range(7)]
    return result

请注意这里传递参数的奇怪方式:pool.amap(_h, (i,),( j,),(k,))

如果你跟踪 _pool.map_async(star(f), zip(*args)) 调用一下,你就会知道为什么这样做了。

一般来说,您可能希望使用不同的参数多次调用您的函数。这是我所做的演示:

def g(self):  
    pool = pt.multiprocessing.ProcessPool()
    def _h(a, b, c):
        print('a: {0} b: {1}. c: {2}'.format(a, b, c))
        k = self.f(a, b, c)
        return k/2
    return [pool.amap(_h, (1,4), (2,5), (3,6))]

即使我显式调用函数一次,它也执行了两次。 输出:

a: 1 b: 2. c: 3
a: 4 b: 5. c: 6

希望对您有所帮助。

因为amap定义为:

149     def amap(self, f, *args, **kwds): # register a callback ?
...
152         return _pool.map_async(star(f), zip(*args)) # chunksize

源码中有使用示例:

pool.amap(pow, [1,2,3,4], [5,6,7,8])

给定 l 作为您的输入:

l = [(i, j, k) for i in range(3) for j in range(5) for k in range(7)]

您可以转置您的输入:

results = pool.amap(_h, *map(list, zip(*l)))

或使用生成器,这应该更快:

def getter(n):
    for e in l:
        yield e[n]
result = pool.amap(_h, *[getter(n) for n in range(3)])

或者,使用 apipe api 代替:

results = [pool.apipe(_h, l)]

当然,一旦你有了想法,你可以让输入更适合它的界面。但是为什么不直接使用 multiprocessing.pool.async_apply 呢,它的界面和你最初预期的一模一样。