通过多处理返回二维数组

returning a two dimensional array by multiprocessing

在下面的代码是我的主要代码示例中,我尝试使用 pathos.multiprocessing 来提高循环迭代的速度。使用多处理实现的每次迭代的输出是一个二维数组。我使用 pathos.multiprocessing 而不是 multiprocessing 因为我想在我的 class 方法 中使用它。我使用 pathos.multiprocessingapipe 方法将输出收集到列表中,但它 returns 是一个空列表。我不知道为什么会失败

import numpy as np
import random
import pathos.multiprocessing as mp
class Testsystematics(object):
      def __init__(self, x, y, NTH = None, THMIN = None, THMAX = None, NRESAMPLE = None):
         self.x        = x
         self.y        = y
         self.nbins    = NTH
         self.bmin     = THMIN
         self.bmax     = THMAX
         self.nresample= NRESAMPLE
         self.bins     = np.linspace(self.bmin, self.bmax, self.nbins+1, True).astype(np.float)
         self.sample   = np.array([[random.choice(range(len(self.y))) for _ in xrange(len(self.y))] for i in range(self.nresample)])
         self.result_list=[]
      def log_result(self, result):
          self.result_list.append(result)
      def bootstrapping(self, k):
          xi_p     = np.zeros(self.nbins, float)
          xi_m     = np.zeros(self.nbins, float)
          nind     = np.zeros(self.nbins, float)
          for i in range(len(self.x)):
              for j in range(len(self.x)):
                  if (i!=j): 
                     sep= np.sqrt(self.x[i]**2+self.x[j]**2)
                     index= np.searchsorted(self.bins, sep , side='right')-1 
                     sind = np.sin(sep)
                     if ((sep< self.bins[-1]) and (sep>=self.bins[0])):
                        xi_p[index] += sind*(np.mean(y)-np.median(y))
                        xi_m[index] += sind*np.std(y)
                        nind[index] += 1.0
          for i in range(self.nbins):
              xi_p[i]=xi_p[i]/nind[i]
              xi_m[i]=xi_m[i]/nind[i]
          return np.vstack((xi_p,xi_m))
      def twopcf(self):   
         if (self.sys_type==1):
            pool = mp.ProcessingPool(16)
            for n in range(self.nresample):
                pool.apipe(self.bootstrapping, args=(n,), callback=self.log_result)

shape,scale=0.5, 0.6
x=np.random.gamma(shape, scale, 10000)
mu1, sigma1 = 0, 0.5 # mean and standard deviation
mu2, sigma2 = 0.1, 0.7 # mean and standard deviation

y = np.random.normal(mu1, sigma1, 1000)+np.random.normal(mu2, sigma2, 1000)
sysTest=Testsystematics(x, y, NTH = 10, THMIN = 0, THMAX = 5, NRESAMPLE = 100)

有什么建议吗?

我是 pathos 的作者。我试过你的代码,它运行了,但没有产生错误,也没有在 result_list 中产生任何结果。我相信那是因为您使用 apipe 不正确。 apipe的正确用法如下:

>>> import pathos
>>> def squared(x):
...   return x**2
... 
>>> pool = pathos.multiprocessing.ProcessingPool()
>>> res = pool.apipe(squared, 5)
>>> res.get()
25

self.bootstrapping 需要 selfk,因此当您将其作为实例方法调用时,必须在管道调用中提供 k。没有 callback -- 如果你想要一个回调,你需要在你的函数中添加一个。

请注意,return 值是通过以下方式检索的:(1) 获取 return 对象,以及 (2) 通过调用 return 对象上的 get

根据你在 for 循环中使用 apipe,我建议你改用 pool.amap(或 pool.imap)——然后你可以这样做并行循环。