使用 python joblib 的并行 class 函数调用
Parallel class function calls using python joblib
可以使用 joblib 多次调用 python 中的一个函数。
from joblib import Parallel, delayed
def normal(x):
print "Normal", x
return x**2
if __name__ == '__main__':
results = Parallel(n_jobs=2)(delayed(normal)(x) for x in range(20))
print results
给出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
但是,我真正想要的是在 class 个实例的列表上并行调用 class 函数。该函数只是存储一个 class 变量。然后稍后我将访问这个变量。
from joblib import Parallel, delayed
class A(object):
def __init__(self, x):
self.x = x
def p(self):
self.y = self.x**2
if __name__ == '__main__':
runs = [A(x) for x in range(20)]
Parallel(n_jobs=4)(delayed(run.p() for run in runs))
for run in runs:
print run.y
这给出了一个错误:
Traceback (most recent call last):
File "", line 1, in
runfile('G:/My Drive/CODE/Whosebug/parallel_classfunc/parallel_classfunc.py',
wdir='G:/My Drive/CODE/Whosebug/parallel_classfunc')
File
"C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py",
line 710, in runfile
execfile(filename, namespace)
File
"C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py",
line 86, in execfile
exec(compile(scripttext, filename, 'exec'), glob, loc)
File "G:/My
Drive/CODE/Whosebug/parallel_classfunc/parallel_classfunc.py",
line 12, in
Parallel(n_jobs=4)(delayed(run.p() for run in runs))
File
"C:\ProgramData\Anaconda2\lib\site-packages\joblib\parallel.py", line
183, in delayed
pickle.dumps(function)
File "C:\ProgramData\Anaconda2\lib\copy_reg.py", line 70, in
_reduce_ex
raise TypeError, "can't pickle %s objects" % base.name
TypeError: can't pickle generator objects
怎么可能像这样将 joblib 与 classes 一起使用?或者有更好的方法吗?
How is it possible to use joblib
with classes like this ?
我们先建议一些代码润色:
并非所有东西都适合 joblib.Parallel()( delayed() )
呼号签名功能:
# >>> type( runs ) <type 'list'>
# >>> type( runs[0] ) <class '__main__.A'>
# >>> type( run.p() for run in runs ) <type 'generator'>
所以,让我们让 DEMO 对象通过 "through" aContainerFUN()
:
Whosebug_DEMO_joblib.Parallel.py
:
from sklearn.externals.joblib import Parallel, delayed
import time
class A( object ):
def __init__( self, x ):
self.x = x
self.y = "Defined on .__init__()"
def p( self ):
self.y = self.x**2
def aNormalFUN( aValueOfX ):
time.sleep( float( aValueOfX ) / 10. )
print ": aNormalFUN() has got aValueOfX == {0:} to process.".format( aValueOfX )
return aValueOfX * aValueOfX
def aContainerFUN( aPayloadOBJECT ):
time.sleep( float( aPayloadOBJECT.x ) / 10. )
# try: except: finally:
pass; aPayloadOBJECT.p()
print "| aContainerFUN: has got aPayloadOBJECT.id({0:}) to process. [ Has made .y == {1:}, given .x == {2: } ]".format( id( aPayloadOBJECT ), aPayloadOBJECT.y, aPayloadOBJECT.x )
time.sleep( 1 )
if __name__ == '__main__':
# ------------------------------------------------------------------
results = Parallel( n_jobs = 2
)( delayed( aNormalFUN )( aParameterX )
for aParameterX in range( 11, 21 )
)
print results
print '.'
# ------------------------------------------------------------------
pass; runs = [ A( x ) for x in range( 11, 21 ) ]
# >>> type( runs ) <type 'list'>
# >>> type( runs[0] ) <class '__main__.A'>
# >>> type( run.p() for run in runs ) <type 'generator'>
Parallel( verbose = 10,
n_jobs = 2
)( delayed( aContainerFUN )( run )
for run in runs
)
结果?很有魅力!
C:\Python27.anaconda> python Whosebug_DEMO_joblib.Parallel.py
: aNormalFUN() has got aValueOfX == 11 to process.
: aNormalFUN() has got aValueOfX == 12 to process.
: aNormalFUN() has got aValueOfX == 13 to process.
: aNormalFUN() has got aValueOfX == 14 to process.
: aNormalFUN() has got aValueOfX == 15 to process.
: aNormalFUN() has got aValueOfX == 16 to process.
: aNormalFUN() has got aValueOfX == 17 to process.
: aNormalFUN() has got aValueOfX == 18 to process.
: aNormalFUN() has got aValueOfX == 19 to process.
: aNormalFUN() has got aValueOfX == 20 to process.
[121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
.
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 121, given .x == 11 ]
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 144, given .x == 12 ]
[Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 2.4s
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 169, given .x == 13 ]
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 196, given .x == 14 ]
[Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 4.9s
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 225, given .x == 15 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 256, given .x == 16 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 289, given .x == 17 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 324, given .x == 18 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 361, given .x == 19 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 400, given .x == 20 ]
[Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 13.3s finished
让第一个调整 class a/c 到第一个函数:
class A(object):
def __init__(self, x):
self.x = x
def p(self):
self.y = self.x**2
return self.y
现在 运行 上面的 class 并行,只需使用 lambda 函数而不是直接调用它(run.p())。
from joblib import Parallel, delayed
class A(object):
def __init__(self, x):
self.x = x
def p(self):
self.y = self.x**2
return self.y
if __name__ == '__main__':
runs = [A(x) for x in range(20)]
with Parallel(n_jobs=6, verbose=5) as parallel:
delayed_funcs = [delayed(lambda x:x.p())(run) for run in runs]
run_A = parallel(delayed_funcs)
print(run_A)
您的输出如下所示:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.
[Parallel(n_jobs=6)]: Done 6 tasks | elapsed: 0.0s
[Parallel(n_jobs=6)]: Done 14 out of 20 | elapsed: 0.0s remaining: 0.0s
[Parallel(n_jobs=6)]: Done 20 out of 20 | elapsed: 0.0s finished
可以使用 joblib 多次调用 python 中的一个函数。
from joblib import Parallel, delayed
def normal(x):
print "Normal", x
return x**2
if __name__ == '__main__':
results = Parallel(n_jobs=2)(delayed(normal)(x) for x in range(20))
print results
给出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
但是,我真正想要的是在 class 个实例的列表上并行调用 class 函数。该函数只是存储一个 class 变量。然后稍后我将访问这个变量。
from joblib import Parallel, delayed
class A(object):
def __init__(self, x):
self.x = x
def p(self):
self.y = self.x**2
if __name__ == '__main__':
runs = [A(x) for x in range(20)]
Parallel(n_jobs=4)(delayed(run.p() for run in runs))
for run in runs:
print run.y
这给出了一个错误:
Traceback (most recent call last):
File "", line 1, in runfile('G:/My Drive/CODE/Whosebug/parallel_classfunc/parallel_classfunc.py', wdir='G:/My Drive/CODE/Whosebug/parallel_classfunc')
File "C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py", line 710, in runfile execfile(filename, namespace)
File "C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py", line 86, in execfile exec(compile(scripttext, filename, 'exec'), glob, loc)
File "G:/My Drive/CODE/Whosebug/parallel_classfunc/parallel_classfunc.py", line 12, in Parallel(n_jobs=4)(delayed(run.p() for run in runs))
File "C:\ProgramData\Anaconda2\lib\site-packages\joblib\parallel.py", line 183, in delayed pickle.dumps(function)
File "C:\ProgramData\Anaconda2\lib\copy_reg.py", line 70, in _reduce_ex raise TypeError, "can't pickle %s objects" % base.name
TypeError: can't pickle generator objects
怎么可能像这样将 joblib 与 classes 一起使用?或者有更好的方法吗?
How is it possible to use
joblib
with classes like this ?
我们先建议一些代码润色:
并非所有东西都适合 joblib.Parallel()( delayed() )
呼号签名功能:
# >>> type( runs ) <type 'list'>
# >>> type( runs[0] ) <class '__main__.A'>
# >>> type( run.p() for run in runs ) <type 'generator'>
所以,让我们让 DEMO 对象通过 "through" aContainerFUN()
:
Whosebug_DEMO_joblib.Parallel.py
:
from sklearn.externals.joblib import Parallel, delayed
import time
class A( object ):
def __init__( self, x ):
self.x = x
self.y = "Defined on .__init__()"
def p( self ):
self.y = self.x**2
def aNormalFUN( aValueOfX ):
time.sleep( float( aValueOfX ) / 10. )
print ": aNormalFUN() has got aValueOfX == {0:} to process.".format( aValueOfX )
return aValueOfX * aValueOfX
def aContainerFUN( aPayloadOBJECT ):
time.sleep( float( aPayloadOBJECT.x ) / 10. )
# try: except: finally:
pass; aPayloadOBJECT.p()
print "| aContainerFUN: has got aPayloadOBJECT.id({0:}) to process. [ Has made .y == {1:}, given .x == {2: } ]".format( id( aPayloadOBJECT ), aPayloadOBJECT.y, aPayloadOBJECT.x )
time.sleep( 1 )
if __name__ == '__main__':
# ------------------------------------------------------------------
results = Parallel( n_jobs = 2
)( delayed( aNormalFUN )( aParameterX )
for aParameterX in range( 11, 21 )
)
print results
print '.'
# ------------------------------------------------------------------
pass; runs = [ A( x ) for x in range( 11, 21 ) ]
# >>> type( runs ) <type 'list'>
# >>> type( runs[0] ) <class '__main__.A'>
# >>> type( run.p() for run in runs ) <type 'generator'>
Parallel( verbose = 10,
n_jobs = 2
)( delayed( aContainerFUN )( run )
for run in runs
)
结果?很有魅力!
C:\Python27.anaconda> python Whosebug_DEMO_joblib.Parallel.py
: aNormalFUN() has got aValueOfX == 11 to process.
: aNormalFUN() has got aValueOfX == 12 to process.
: aNormalFUN() has got aValueOfX == 13 to process.
: aNormalFUN() has got aValueOfX == 14 to process.
: aNormalFUN() has got aValueOfX == 15 to process.
: aNormalFUN() has got aValueOfX == 16 to process.
: aNormalFUN() has got aValueOfX == 17 to process.
: aNormalFUN() has got aValueOfX == 18 to process.
: aNormalFUN() has got aValueOfX == 19 to process.
: aNormalFUN() has got aValueOfX == 20 to process.
[121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
.
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 121, given .x == 11 ]
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 144, given .x == 12 ]
[Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 2.4s
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 169, given .x == 13 ]
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 196, given .x == 14 ]
[Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 4.9s
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 225, given .x == 15 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 256, given .x == 16 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 289, given .x == 17 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 324, given .x == 18 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 361, given .x == 19 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 400, given .x == 20 ]
[Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 13.3s finished
让第一个调整 class a/c 到第一个函数:
class A(object):
def __init__(self, x):
self.x = x
def p(self):
self.y = self.x**2
return self.y
现在 运行 上面的 class 并行,只需使用 lambda 函数而不是直接调用它(run.p())。
from joblib import Parallel, delayed
class A(object):
def __init__(self, x):
self.x = x
def p(self):
self.y = self.x**2
return self.y
if __name__ == '__main__':
runs = [A(x) for x in range(20)]
with Parallel(n_jobs=6, verbose=5) as parallel:
delayed_funcs = [delayed(lambda x:x.p())(run) for run in runs]
run_A = parallel(delayed_funcs)
print(run_A)
您的输出如下所示:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.
[Parallel(n_jobs=6)]: Done 6 tasks | elapsed: 0.0s
[Parallel(n_jobs=6)]: Done 14 out of 20 | elapsed: 0.0s remaining: 0.0s
[Parallel(n_jobs=6)]: Done 20 out of 20 | elapsed: 0.0s finished