如何在 dask 中并行化嵌套循环

How to parallelise a nested loop in dask

我有以下代码:

while (i< 10):
  for i in range(0, len(df_1)):
      new_df_1 = df_1.iloc[i]
      for j in (len(df_2)):
         new_df_2 = df_2.iloc[j]
         client.compute(self.func(i, new_df_1, new_df_2), scheduler="processes"), 
          break

我不知道如何在这样的嵌套循环中使用 dask 来加速代码。我试图使内部函数成为如下函数,但引发了错误。

这是我试过的:

while (i< 10):
  for i in range(0, len(df_1)):
      new_df_1 = df_1.iloc[i]
      def process_l(i, client, new_df_1, new_df_2):
         for j in (len(df_2)):
            new_df_2 = df_2.iloc[j]
            client.compute(self.func(i, new_df_1, new_df_2), scheduler="processes"), 
            break

      client.submit(process_l(i, new_df_1, new_df_2)
    

调用 .compute() 将停止进一步执行代码,直到 .compute() 的结果准备就绪。相反,您可能想使用 delayedclient.submit。这是一个粗略的建议:

futs = []
# to avoid the while loop
for i in range(0, min(10, len(df_1))):
    new_df_1 = df_1.iloc[i]
    for j in range(0, len(df_2)):
        new_df_2 = df_2.iloc[j]

        # this will submit future and proceed with the code without
        # waiting for the result
        fut = client.submit(self.func, i, new_df_1, new_df_2, scheduler="processes")
        futs.append(fut)

results = client.gather(futs) # this waits for all results