为什么与 DASK Delayed 合并比与 DASK 内置命令合并花费的时间要多得多?

Why merging with DASK Delayed takes extremely more time than merging with DASK built-in command?

我想将形状为 df1.shape = (80000, 18) 的大 pandas 数据框合并到形状为 df2.shape = (1, 18) 的小数据框一个名为“键”的列。这是使用 dd.merge:

的时间表现
ddf1 = from_pandas(df1, npartitions=20)
ddf2 = from_pandas(df2, npartitions=1)
start = time.time()
pred_mldf = dd.merge(ddf1 , ddf2, on =['key'])
print(pred_mldf)
t0 = time.time()
print("deltat = ", t0 - start)

结果是 deltat = 0.04。

然后我开始使用 dask delayed 以这种方式实现它:

def mymerge(df1, df2, key):
    pred_mldf = pd.merge(df1, df2, on = key)
    return pred_mldf

start = time.time()
pred_mldf = dask.delayed(mymerge)(df1, df2, ['key'])
pred_mldf.compute()
t0 = time.time()
print("deltat = ", t0 - start)

结果是 deltat = 3.48。

我的假设是我需要用两种方法达到相同的时间性能。我在这里做错了什么?

正如@Nick Becker 在评论中指出的那样,现在您的第一个代码块仅定义合并,但不执行它(而第二个代码块执行),因此添加 .compute() 应该给出一个不同的合并时间:

ddf1 = from_pandas(df1, npartitions=20)
ddf2 = from_pandas(df2, npartitions=1)
start = time.time()
pred_mldf = dd.merge(ddf1 , ddf2, on =['key']).compute()
print(pred_mldf)
t0 = time.time()
print("deltat = ", t0 - start)

执行速度不同的另一个原因是,在第二个代码块中,您将完整的 df1 传递给延迟函数。如果 df1 很大,那么将它分成 20 个块(就像在第一个代码块中一样)并将它们分别传递给延迟函数可能更公平一些。