如何在 dask 中并行化嵌套循环
How to parallelise a nested loop in dask
我有以下代码:
while (i< 10):
for i in range(0, len(df_1)):
new_df_1 = df_1.iloc[i]
for j in (len(df_2)):
new_df_2 = df_2.iloc[j]
client.compute(self.func(i, new_df_1, new_df_2), scheduler="processes"),
break
我不知道如何在这样的嵌套循环中使用 dask 来加速代码。我试图使内部函数成为如下函数,但引发了错误。
这是我试过的:
while (i< 10):
for i in range(0, len(df_1)):
new_df_1 = df_1.iloc[i]
def process_l(i, client, new_df_1, new_df_2):
for j in (len(df_2)):
new_df_2 = df_2.iloc[j]
client.compute(self.func(i, new_df_1, new_df_2), scheduler="processes"),
break
client.submit(process_l(i, new_df_1, new_df_2)
调用 .compute()
将停止进一步执行代码,直到 .compute()
的结果准备就绪。相反,您可能想使用 delayed
或 client.submit
。这是一个粗略的建议:
futs = []
# to avoid the while loop
for i in range(0, min(10, len(df_1))):
new_df_1 = df_1.iloc[i]
for j in range(0, len(df_2)):
new_df_2 = df_2.iloc[j]
# this will submit future and proceed with the code without
# waiting for the result
fut = client.submit(self.func, i, new_df_1, new_df_2, scheduler="processes")
futs.append(fut)
results = client.gather(futs) # this waits for all results
我有以下代码:
while (i< 10):
for i in range(0, len(df_1)):
new_df_1 = df_1.iloc[i]
for j in (len(df_2)):
new_df_2 = df_2.iloc[j]
client.compute(self.func(i, new_df_1, new_df_2), scheduler="processes"),
break
我不知道如何在这样的嵌套循环中使用 dask 来加速代码。我试图使内部函数成为如下函数,但引发了错误。
这是我试过的:
while (i< 10):
for i in range(0, len(df_1)):
new_df_1 = df_1.iloc[i]
def process_l(i, client, new_df_1, new_df_2):
for j in (len(df_2)):
new_df_2 = df_2.iloc[j]
client.compute(self.func(i, new_df_1, new_df_2), scheduler="processes"),
break
client.submit(process_l(i, new_df_1, new_df_2)
调用 .compute()
将停止进一步执行代码,直到 .compute()
的结果准备就绪。相反,您可能想使用 delayed
或 client.submit
。这是一个粗略的建议:
futs = []
# to avoid the while loop
for i in range(0, min(10, len(df_1))):
new_df_1 = df_1.iloc[i]
for j in range(0, len(df_2)):
new_df_2 = df_2.iloc[j]
# this will submit future and proceed with the code without
# waiting for the result
fut = client.submit(self.func, i, new_df_1, new_df_2, scheduler="processes")
futs.append(fut)
results = client.gather(futs) # this waits for all results