Python concurrent.futures

Python concurrent.futures

我有一个多处理代码,每个进程都必须以不同的方式分析相同的数据。

我已经实现了:

with concurrent.futures.ProcessPoolExecutor() as executor:
   res = executor.map(goal_fcn, p, [global_DataFrame], [global_String])

for f in concurrent.futures.as_completed(res):
   fp = res

和功能:

def goal_fcn(x, DataFrame, String):
   return heavy_calculation(x, DataFrame, String)

问题是 goal_fcn 只被调用一次,而应该是多次

在调试器中,我现在检查变量 p 正在查找,它有多个列和行。在 goal_fcn 中,变量 x 只有第一行 - 看起来不错。

但是这个函数只被调用了一次。没有错误,代码只是执行下一步。

即使我修改变量p = [1,3,4,5],当然还有代码。 goal_fcn只执行一次

我必须使用 map() 因为需要保持输入和输出之间的顺序

map 的工作方式类似于 zip。一旦至少一个输入序列结束,它就会终止。您的 [global_DataFrame][global_String] 列表各有一个元素,所以这就是地图结束的地方。

有两种解决方法:

  1. 使用itertools.product。这相当于 运行“对于所有数据帧,对于所有字符串,对于所有 p”。像这样:
def goal_fcn(x_DataFrame_String):
    x, DataFrame, String = x_DataFrame_String
    ...

executor.map(goal_fcn, itertools.product(p, [global_DataFrame], [global_String]))
  1. 绑定固定参数而不是滥用序列参数。
def goal_fcn(x, DataFrame, String):
    pass

bound = functools.partial(goal_fcn, DataFrame=global_DataFrame, String=global_String)
executor.map(bound, p)