使用 python joblib 访问和更改全局数组
Accessing and altering a global array using python joblib
我正在尝试使用 python 中的 joblib 来加速某些数据处理,但我在尝试找出如何将输出分配为所需格式时遇到了问题。我试图生成一个可能过于简单的代码来显示我遇到的问题:
from joblib import Parallel, delayed
import numpy as np
def main():
print "Nested loop array assignment:"
regular()
print "Parallel nested loop assignment using a single process:"
par2(1)
print "Parallel nested loop assignment using multiple process:"
par2(2)
def regular():
# Define variables
a = [0,1,2,3,4]
b = [0,1,2,3,4]
# Set array variable to global and define size and shape
global ab
ab = np.zeros((2,np.size(a),np.size(b)))
# Iterate to populate array
for i in range(0,np.size(a)):
for j in range(0,np.size(b)):
func(i,j,a,b)
# Show array output
print ab
def par2(process):
# Define variables
a2 = [0,1,2,3,4]
b2 = [0,1,2,3,4]
# Set array variable to global and define size and shape
global ab2
ab2 = np.zeros((2,np.size(a2),np.size(b2)))
# Parallel process in order to populate array
Parallel(n_jobs=process)(delayed(func2)(i,j,a2,b2) for i in xrange(0,np.size(a2)) for j in xrange(0,np.size(b2)))
# Show array output
print ab2
def func(i,j,a,b):
# Populate array
ab[0,i,j] = a[i]+b[j]
ab[1,i,j] = a[i]*b[j]
def func2(i,j,a2,b2):
# Populate array
ab2[0,i,j] = a2[i]+b2[j]
ab2[1,i,j] = a2[i]*b2[j]
# Run script
main()
其输出如下所示:
Nested loop array assignment:
[[[ 0. 1. 2. 3. 4.]
[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 1. 2. 3. 4.]
[ 0. 2. 4. 6. 8.]
[ 0. 3. 6. 9. 12.]
[ 0. 4. 8. 12. 16.]]]
Parallel nested loop assignment using a single process:
[[[ 0. 1. 2. 3. 4.]
[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 1. 2. 3. 4.]
[ 0. 2. 4. 6. 8.]
[ 0. 3. 6. 9. 12.]
[ 0. 4. 8. 12. 16.]]]
Parallel nested loop assignment using multiple process:
[[[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]]]
从 Google 和 Whosebug 搜索函数看来,当使用 joblib 时,全局数组不会在每个子进程之间共享。我不确定这是 joblib 的限制还是有办法解决这个问题?
实际上我的脚本被其他代码包围,这些代码依赖于这个全局数组的最终输出在 (4,x,x ) 格式,其中 x 是可变的(但通常在 100 到几千之间)。这是我目前考虑并行处理的原因,因为 x = 2400.
的整个过程最多可能需要 2 个小时
joblib 的使用不是必需的(但我喜欢命名法和简单性)所以请随意提出简单的替代方法,最好牢记最终数组的要求。我正在使用 python 2.7.3 和 joblib 0.7.1.
我能够使用 numpy 的内存映射解决这个简单示例的问题。
使用 memmap 并遵循 joblib documentation webpage 上的示例后,我仍然遇到问题,但我通过 pip 升级到最新的 joblib 版本 (0.9.3),并且一切运行顺利。这是工作代码:
from joblib import Parallel, delayed
import numpy as np
import os
import tempfile
import shutil
def main():
print "Nested loop array assignment:"
regular()
print "Parallel nested loop assignment using numpy's memmap:"
par3(4)
def regular():
# Define variables
a = [0,1,2,3,4]
b = [0,1,2,3,4]
# Set array variable to global and define size and shape
global ab
ab = np.zeros((2,np.size(a),np.size(b)))
# Iterate to populate array
for i in range(0,np.size(a)):
for j in range(0,np.size(b)):
func(i,j,a,b)
# Show array output
print ab
def par3(process):
# Creat a temporary directory and define the array path
path = tempfile.mkdtemp()
ab3path = os.path.join(path,'ab3.mmap')
# Define variables
a3 = [0,1,2,3,4]
b3 = [0,1,2,3,4]
# Create the array using numpy's memmap
ab3 = np.memmap(ab3path, dtype=float, shape=(2,np.size(a3),np.size(b3)), mode='w+')
# Parallel process in order to populate array
Parallel(n_jobs=process)(delayed(func3)(i,a3,b3,ab3) for i in xrange(0,np.size(a3)))
# Show array output
print ab3
# Delete the temporary directory and contents
try:
shutil.rmtree(path)
except:
print "Couldn't delete folder: "+str(path)
def func(i,j,a,b):
# Populate array
ab[0,i,j] = a[i]+b[j]
ab[1,i,j] = a[i]*b[j]
def func3(i,a3,b3,ab3):
# Populate array
for j in range(0,np.size(b3)):
ab3[0,i,j] = a3[i]+b3[j]
ab3[1,i,j] = a3[i]*b3[j]
# Run script
main()
给出以下结果:
Nested loop array assignment:
[[[ 0. 1. 2. 3. 4.]
[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 1. 2. 3. 4.]
[ 0. 2. 4. 6. 8.]
[ 0. 3. 6. 9. 12.]
[ 0. 4. 8. 12. 16.]]]
Parallel nested loop assignment using numpy's memmap:
[[[ 0. 1. 2. 3. 4.]
[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 1. 2. 3. 4.]
[ 0. 2. 4. 6. 8.]
[ 0. 3. 6. 9. 12.]
[ 0. 4. 8. 12. 16.]]]
我的一些想法供未来的读者注意:
- 在小型阵列上,准备并行环境所花费的时间
(通常称为开销)意味着这个运行速度比
简单的 for 循环。
- 比较更大的数组,例如。将 a 和 a3 设置为
np.arange(0,10000)
、b 和 b3 到 np.arange(0,1000)
给了
"regular" 方法耗时 12.4 秒,joblib 耗时 7.7 秒
方法。
- 开销意味着让每个核心执行速度更快
内部 j 循环(参见 func3)。这是有道理的,因为我只是
启动 10,000 个进程而不是启动 10,000,000
每个过程都需要设置。
我正在使用的 joblib
版本 (0.13.2
),实际上允许我访问大共享 DataFrames
而无需太多麻烦。
当然 DataFrames
需要在并行循环开始之前 pre-allocated 并且每个线程必须只访问它的 DataFrame
部分来写入,但它有效。
data = pd.DataFrame(...)
stats = pd.DataFrame(np.nan, index=np.arange(0, size/step), columns=cols, dtype=np.float64)
Parallel(n_jobs=8, prefer='threads')(
delayed(_threadsafe_func)(data, stats, i, step, other_params)
for i in range(0, size, step))
在 _threadsafe_func
中,可以这样读取或写入 stats
DataFrame
:
index = i/step
print('[' + str(i) + '] Running job with index:', str(int(index)), '/', len(data)/step)
chunk = data[i:i + step]
stats.loc[index, 'mean'] = chunk.mean() # 'mean' is an existing column already filled with np.nan
我正在尝试使用 python 中的 joblib 来加速某些数据处理,但我在尝试找出如何将输出分配为所需格式时遇到了问题。我试图生成一个可能过于简单的代码来显示我遇到的问题:
from joblib import Parallel, delayed
import numpy as np
def main():
print "Nested loop array assignment:"
regular()
print "Parallel nested loop assignment using a single process:"
par2(1)
print "Parallel nested loop assignment using multiple process:"
par2(2)
def regular():
# Define variables
a = [0,1,2,3,4]
b = [0,1,2,3,4]
# Set array variable to global and define size and shape
global ab
ab = np.zeros((2,np.size(a),np.size(b)))
# Iterate to populate array
for i in range(0,np.size(a)):
for j in range(0,np.size(b)):
func(i,j,a,b)
# Show array output
print ab
def par2(process):
# Define variables
a2 = [0,1,2,3,4]
b2 = [0,1,2,3,4]
# Set array variable to global and define size and shape
global ab2
ab2 = np.zeros((2,np.size(a2),np.size(b2)))
# Parallel process in order to populate array
Parallel(n_jobs=process)(delayed(func2)(i,j,a2,b2) for i in xrange(0,np.size(a2)) for j in xrange(0,np.size(b2)))
# Show array output
print ab2
def func(i,j,a,b):
# Populate array
ab[0,i,j] = a[i]+b[j]
ab[1,i,j] = a[i]*b[j]
def func2(i,j,a2,b2):
# Populate array
ab2[0,i,j] = a2[i]+b2[j]
ab2[1,i,j] = a2[i]*b2[j]
# Run script
main()
其输出如下所示:
Nested loop array assignment:
[[[ 0. 1. 2. 3. 4.]
[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 1. 2. 3. 4.]
[ 0. 2. 4. 6. 8.]
[ 0. 3. 6. 9. 12.]
[ 0. 4. 8. 12. 16.]]]
Parallel nested loop assignment using a single process:
[[[ 0. 1. 2. 3. 4.]
[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 1. 2. 3. 4.]
[ 0. 2. 4. 6. 8.]
[ 0. 3. 6. 9. 12.]
[ 0. 4. 8. 12. 16.]]]
Parallel nested loop assignment using multiple process:
[[[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]]]
从 Google 和 Whosebug 搜索函数看来,当使用 joblib 时,全局数组不会在每个子进程之间共享。我不确定这是 joblib 的限制还是有办法解决这个问题?
实际上我的脚本被其他代码包围,这些代码依赖于这个全局数组的最终输出在 (4,x,x ) 格式,其中 x 是可变的(但通常在 100 到几千之间)。这是我目前考虑并行处理的原因,因为 x = 2400.
的整个过程最多可能需要 2 个小时joblib 的使用不是必需的(但我喜欢命名法和简单性)所以请随意提出简单的替代方法,最好牢记最终数组的要求。我正在使用 python 2.7.3 和 joblib 0.7.1.
我能够使用 numpy 的内存映射解决这个简单示例的问题。 使用 memmap 并遵循 joblib documentation webpage 上的示例后,我仍然遇到问题,但我通过 pip 升级到最新的 joblib 版本 (0.9.3),并且一切运行顺利。这是工作代码:
from joblib import Parallel, delayed
import numpy as np
import os
import tempfile
import shutil
def main():
print "Nested loop array assignment:"
regular()
print "Parallel nested loop assignment using numpy's memmap:"
par3(4)
def regular():
# Define variables
a = [0,1,2,3,4]
b = [0,1,2,3,4]
# Set array variable to global and define size and shape
global ab
ab = np.zeros((2,np.size(a),np.size(b)))
# Iterate to populate array
for i in range(0,np.size(a)):
for j in range(0,np.size(b)):
func(i,j,a,b)
# Show array output
print ab
def par3(process):
# Creat a temporary directory and define the array path
path = tempfile.mkdtemp()
ab3path = os.path.join(path,'ab3.mmap')
# Define variables
a3 = [0,1,2,3,4]
b3 = [0,1,2,3,4]
# Create the array using numpy's memmap
ab3 = np.memmap(ab3path, dtype=float, shape=(2,np.size(a3),np.size(b3)), mode='w+')
# Parallel process in order to populate array
Parallel(n_jobs=process)(delayed(func3)(i,a3,b3,ab3) for i in xrange(0,np.size(a3)))
# Show array output
print ab3
# Delete the temporary directory and contents
try:
shutil.rmtree(path)
except:
print "Couldn't delete folder: "+str(path)
def func(i,j,a,b):
# Populate array
ab[0,i,j] = a[i]+b[j]
ab[1,i,j] = a[i]*b[j]
def func3(i,a3,b3,ab3):
# Populate array
for j in range(0,np.size(b3)):
ab3[0,i,j] = a3[i]+b3[j]
ab3[1,i,j] = a3[i]*b3[j]
# Run script
main()
给出以下结果:
Nested loop array assignment:
[[[ 0. 1. 2. 3. 4.]
[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 1. 2. 3. 4.]
[ 0. 2. 4. 6. 8.]
[ 0. 3. 6. 9. 12.]
[ 0. 4. 8. 12. 16.]]]
Parallel nested loop assignment using numpy's memmap:
[[[ 0. 1. 2. 3. 4.]
[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]]
[[ 0. 0. 0. 0. 0.]
[ 0. 1. 2. 3. 4.]
[ 0. 2. 4. 6. 8.]
[ 0. 3. 6. 9. 12.]
[ 0. 4. 8. 12. 16.]]]
我的一些想法供未来的读者注意:
- 在小型阵列上,准备并行环境所花费的时间 (通常称为开销)意味着这个运行速度比 简单的 for 循环。
- 比较更大的数组,例如。将 a 和 a3 设置为
np.arange(0,10000)
、b 和 b3 到np.arange(0,1000)
给了 "regular" 方法耗时 12.4 秒,joblib 耗时 7.7 秒 方法。 - 开销意味着让每个核心执行速度更快
内部 j 循环(参见 func3)。这是有道理的,因为我只是
启动 10,000 个进程而不是启动 10,000,000
每个过程都需要设置。
我正在使用的 joblib
版本 (0.13.2
),实际上允许我访问大共享 DataFrames
而无需太多麻烦。
当然 DataFrames
需要在并行循环开始之前 pre-allocated 并且每个线程必须只访问它的 DataFrame
部分来写入,但它有效。
data = pd.DataFrame(...)
stats = pd.DataFrame(np.nan, index=np.arange(0, size/step), columns=cols, dtype=np.float64)
Parallel(n_jobs=8, prefer='threads')(
delayed(_threadsafe_func)(data, stats, i, step, other_params)
for i in range(0, size, step))
在 _threadsafe_func
中,可以这样读取或写入 stats
DataFrame
:
index = i/step
print('[' + str(i) + '] Running job with index:', str(int(index)), '/', len(data)/step)
chunk = data[i:i + step]
stats.loc[index, 'mean'] = chunk.mean() # 'mean' is an existing column already filled with np.nan