通过多处理返回二维数组
returning a two dimensional array by multiprocessing
在下面的代码是我的主要代码示例中,我尝试使用 pathos.multiprocessing
来提高循环迭代的速度。使用多处理实现的每次迭代的输出是一个二维数组。我使用 pathos.multiprocessing
而不是 multiprocessing
因为我想在我的 class 方法 中使用它。我使用 pathos.multiprocessing
的 apipe
方法将输出收集到列表中,但它 returns 是一个空列表。我不知道为什么会失败
import numpy as np
import random
import pathos.multiprocessing as mp
class Testsystematics(object):
def __init__(self, x, y, NTH = None, THMIN = None, THMAX = None, NRESAMPLE = None):
self.x = x
self.y = y
self.nbins = NTH
self.bmin = THMIN
self.bmax = THMAX
self.nresample= NRESAMPLE
self.bins = np.linspace(self.bmin, self.bmax, self.nbins+1, True).astype(np.float)
self.sample = np.array([[random.choice(range(len(self.y))) for _ in xrange(len(self.y))] for i in range(self.nresample)])
self.result_list=[]
def log_result(self, result):
self.result_list.append(result)
def bootstrapping(self, k):
xi_p = np.zeros(self.nbins, float)
xi_m = np.zeros(self.nbins, float)
nind = np.zeros(self.nbins, float)
for i in range(len(self.x)):
for j in range(len(self.x)):
if (i!=j):
sep= np.sqrt(self.x[i]**2+self.x[j]**2)
index= np.searchsorted(self.bins, sep , side='right')-1
sind = np.sin(sep)
if ((sep< self.bins[-1]) and (sep>=self.bins[0])):
xi_p[index] += sind*(np.mean(y)-np.median(y))
xi_m[index] += sind*np.std(y)
nind[index] += 1.0
for i in range(self.nbins):
xi_p[i]=xi_p[i]/nind[i]
xi_m[i]=xi_m[i]/nind[i]
return np.vstack((xi_p,xi_m))
def twopcf(self):
if (self.sys_type==1):
pool = mp.ProcessingPool(16)
for n in range(self.nresample):
pool.apipe(self.bootstrapping, args=(n,), callback=self.log_result)
shape,scale=0.5, 0.6
x=np.random.gamma(shape, scale, 10000)
mu1, sigma1 = 0, 0.5 # mean and standard deviation
mu2, sigma2 = 0.1, 0.7 # mean and standard deviation
y = np.random.normal(mu1, sigma1, 1000)+np.random.normal(mu2, sigma2, 1000)
sysTest=Testsystematics(x, y, NTH = 10, THMIN = 0, THMAX = 5, NRESAMPLE = 100)
有什么建议吗?
我是 pathos
的作者。我试过你的代码,它运行了,但没有产生错误,也没有在 result_list
中产生任何结果。我相信那是因为您使用 apipe
不正确。 apipe
的正确用法如下:
>>> import pathos
>>> def squared(x):
... return x**2
...
>>> pool = pathos.multiprocessing.ProcessingPool()
>>> res = pool.apipe(squared, 5)
>>> res.get()
25
self.bootstrapping
需要 self
和 k
,因此当您将其作为实例方法调用时,必须在管道调用中提供 k
。没有 callback
-- 如果你想要一个回调,你需要在你的函数中添加一个。
请注意,return 值是通过以下方式检索的:(1) 获取 return 对象,以及 (2) 通过调用 return 对象上的 get
。
根据你在 for
循环中使用 apipe
,我建议你改用 pool.amap
(或 pool.imap
)——然后你可以这样做并行循环。
在下面的代码是我的主要代码示例中,我尝试使用 pathos.multiprocessing
来提高循环迭代的速度。使用多处理实现的每次迭代的输出是一个二维数组。我使用 pathos.multiprocessing
而不是 multiprocessing
因为我想在我的 class 方法 中使用它。我使用 pathos.multiprocessing
的 apipe
方法将输出收集到列表中,但它 returns 是一个空列表。我不知道为什么会失败
import numpy as np
import random
import pathos.multiprocessing as mp
class Testsystematics(object):
def __init__(self, x, y, NTH = None, THMIN = None, THMAX = None, NRESAMPLE = None):
self.x = x
self.y = y
self.nbins = NTH
self.bmin = THMIN
self.bmax = THMAX
self.nresample= NRESAMPLE
self.bins = np.linspace(self.bmin, self.bmax, self.nbins+1, True).astype(np.float)
self.sample = np.array([[random.choice(range(len(self.y))) for _ in xrange(len(self.y))] for i in range(self.nresample)])
self.result_list=[]
def log_result(self, result):
self.result_list.append(result)
def bootstrapping(self, k):
xi_p = np.zeros(self.nbins, float)
xi_m = np.zeros(self.nbins, float)
nind = np.zeros(self.nbins, float)
for i in range(len(self.x)):
for j in range(len(self.x)):
if (i!=j):
sep= np.sqrt(self.x[i]**2+self.x[j]**2)
index= np.searchsorted(self.bins, sep , side='right')-1
sind = np.sin(sep)
if ((sep< self.bins[-1]) and (sep>=self.bins[0])):
xi_p[index] += sind*(np.mean(y)-np.median(y))
xi_m[index] += sind*np.std(y)
nind[index] += 1.0
for i in range(self.nbins):
xi_p[i]=xi_p[i]/nind[i]
xi_m[i]=xi_m[i]/nind[i]
return np.vstack((xi_p,xi_m))
def twopcf(self):
if (self.sys_type==1):
pool = mp.ProcessingPool(16)
for n in range(self.nresample):
pool.apipe(self.bootstrapping, args=(n,), callback=self.log_result)
shape,scale=0.5, 0.6
x=np.random.gamma(shape, scale, 10000)
mu1, sigma1 = 0, 0.5 # mean and standard deviation
mu2, sigma2 = 0.1, 0.7 # mean and standard deviation
y = np.random.normal(mu1, sigma1, 1000)+np.random.normal(mu2, sigma2, 1000)
sysTest=Testsystematics(x, y, NTH = 10, THMIN = 0, THMAX = 5, NRESAMPLE = 100)
有什么建议吗?
我是 pathos
的作者。我试过你的代码,它运行了,但没有产生错误,也没有在 result_list
中产生任何结果。我相信那是因为您使用 apipe
不正确。 apipe
的正确用法如下:
>>> import pathos
>>> def squared(x):
... return x**2
...
>>> pool = pathos.multiprocessing.ProcessingPool()
>>> res = pool.apipe(squared, 5)
>>> res.get()
25
self.bootstrapping
需要 self
和 k
,因此当您将其作为实例方法调用时,必须在管道调用中提供 k
。没有 callback
-- 如果你想要一个回调,你需要在你的函数中添加一个。
请注意,return 值是通过以下方式检索的:(1) 获取 return 对象,以及 (2) 通过调用 return 对象上的 get
。
根据你在 for
循环中使用 apipe
,我建议你改用 pool.amap
(或 pool.imap
)——然后你可以这样做并行循环。