错误或功能?无法在 python 脚本中执行两个连续的多处理步骤

Bug or feature? Unable to do two consecutive steps of multiprocessing in a python script

我有两个处理大型列表的连续函数。

我一个接一个地调用,使用joblibParallel, delayed,试图分别提高两个函数的处理速度。

但是,一旦 Parallel 调用 function_2,我也会看到 function_1 的输出,但我不明白为什么。 简而言之,这会导致 function_2 未被调用。

主要代码:

from mycode import function_2
from joblib import Parallel, delayed
import gc

if __name__ == '__main__':  
   list = list_1
   print ">>> First call"
   Parallel(n_jobs = -1)(delayed(function_1) 
                                         (item) for item in list)
   gc.collect()
   do_other_stuff()
   list = list_2
   print ">>> Second call"
   Parallel(n_jobs=-1, backend='threading')(delayed(function_2)
                                         (item) for item in list)

线程函数:

def function_1(): # Gets called first
    print "this comes from function 1"
    pass

def function_2(): # Gets called second
    print "this comes from function 2"
    pass

输出:

>>> First call
this comes from function 1
this comes from function 1
this comes from function 1
this comes from function 1
>>> Second call
this comes from function 1
this comes from function 1
this comes from function 1
this comes from function 1

我的假设function_1有一部分存储在内存中,调用后保留(可能是由于joblib内存映射/共享功能?)。

这就是为什么我 gc.collect() 在通话之间。由于这无济于事,我考虑调用之间的 reloading modules ( joblib, Parallel, delayed ),这看起来很难看。

有没有人经历过类似的行为(在 windows)?

有什么解决办法吗?

我是否需要 un/reload joblibmycode 模块,在 Parallel 步骤之间,如果需要,为什么?

简短版本:

Q1:有没有人经历过类似的行为(在 windows)?
A1:

Q2: 有什么解决办法吗?
A2:不,参考。 A1.

问题 3:我需要 un/reload joblibmycode 模块吗?
A3:不,参考。 A1.

Q4:如果是(Q3),为什么?
A4:N/A,参考。 A3.


让我们基于一个通用的 MCVE 公式来推动我们的努力:

稍作修改的实验结构可能如下所示:

#ass;                         import function_2
from sklearn.externals.joblib import Parallel, delayed
#ass;                         import gc
pass;                         import itertools

def function_1( aParam = "" ):                                          # Gets called first
    try:
         print "this comes from a call: function_1( aParam == {0:})".format( aParam )

    except:
         pass # die in silence

    finally:
         return aParam

def function_2( aParam = "" ):                                          # Gets called second
    try:
         print "this comes from a call: function_2( aParam == {0:})".format( aParam )

    except:
         pass # die in silence

    finally:
         return aParam

if __name__ == '__main__':
   print "-------------------------------------------------------------- vvv main.START()"
   #ist = list_1
   aList = [ 11, 12, 13, 14, 15, ]
   print "-------------------------------------------------------------- >>> First call"
   A = Parallel(                                               n_jobs = -1
                 )( delayed( function_1 ) ( item ) for item in aList
                    )
   print "-------------------------------------------------------------- vvv main was ret'd: {0:}".format( repr( A ) )
   #c.collect()
   #o_other_stuff()
   #ist = list_2
   aList = [ 21, 22, 23, 24, 25, ]
   print "-------------------------------------------------------------- >>> Second call"
   B = Parallel(                                               n_jobs  = -1,
                                                               backend = 'threading'
                 )( delayed( function_2 ) ( item ) for item in aList
                    )
   print "-------------------------------------------------------------- vvv main was ret'd: {0:}".format( repr( B ) )

结果:

C:\Python27.anaconda>python TEST_SO_Parallel.py
-------------------------------------------------------------- vvv main.START()
-------------------------------------------------------------- >>> First call
this comes from a call: function_1( aParam == 11)
this comes from a call: function_1( aParam == 12)
this comes from a call: function_1( aParam == 13)
this comes from a call: function_1( aParam == 14)
this comes from a call: function_1( aParam == 15)
-------------------------------------------------------------- vvv main was ret'd: [11, 12, 13, 14, 15]
-------------------------------------------------------------- >>> Second call
this comes from a call: function_2( aParam == 21)
this comes from a call: function_2( aParam == 22)
this comes from a call: function_2( aParam == 23)
 this comes from a call: function_2( aParam == 25)this comes from a call: function_2( aParam == 24)

-------------------------------------------------------------- vvv main was ret'd: [21, 22, 23, 24, 25]

评价:

据观察,[win] py2.7 已处理代码,没有任何上述报告的障碍。

joblib-记录的处理按照规范是正确的。

以上报告的行为未被复制,较少可以映射到任何形式的 joblib-参与因果链。

我遇到了同样的问题。

我的代码如下:

A = Parallel(n_jobs=1)(delayed(self.function_1)( df_1, item ) for item in list_of_items)

B = Parallel(n_jobs=1)(delayed(self.function_2)( df_2, item ) for item in list_of_items)

其中 "list_of_items" 变量有 2 个项目。

但是输出是...

[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:   32.2s finished
[Parallel(n_jobs=1)]: Done   0 out of   0 | elapsed:    0.0s finished

第二个并行进程没有 运行 的原因(至少在我的情况下)是因为我的 "list_of_items" 是生成器而不是列表!

我希望这也能解决您的问题..:)