dask.distributed 没有利用集群

dask.distributed not utilising the cluster

我无法使用分布式集群处理这个块。

import pandas as pd
from dask import dataframe as dd 
import dask

df = pd.DataFrame({'reid_encod': [[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10],[1,2,3,4,5,6,7,8,9,10]]})
dask_df = dd.from_pandas(df, npartitions=3)
save_val = []
def add(dask_df):
    for _, outer_row in dask_df.iterrows():
        for _, inner_row in dask_df.iterrows():
            for base_encod in outer_row['reid_encod']:
               for compare_encod in inner_row['reid_encod']:
                   val = base_encod + compare_encod
                   save_val.append(val)
    return save_val

from dask.distributed import Client

client = Client(...)
dask_compute = dask.delayed(add)(dask_df)
dask_compute.compute()

我也有一些疑问

  1. dask.delayed是否使用可用的集群进行计算。

  2. 我能否使用延迟并行化此 pandas DF 的 for 循环迭代,并使用集群中存在的多台计算机进行计算。

  3. dask.distributed 是否在 pandas 数据帧上工作。

  4. 我们可以在dask.distributed中使用dask.delayed吗?

  5. 如果上面的编程方式有误,能否指导一下上面的场景是选择delayed还是dask DF

this section 最佳实践中概述了代码的主要问题:不要将 Dask 集合传递给延迟函数。这意味着,您应该使用 delayed API dataframe API .虽然您可以转换数据帧<->延迟,但不建议像这样简单地传递。

此外,

  • 你的数据框中只有一行,所以你只有一个分区,没有任何并行性。你只能这样放慢速度。
  • 这似乎是一个一切到一切 (N^2) 的操作,所以如果你有很多行(Dask 的正常情况),无论你使用多少核,它都可能需要很长时间
  • 在 pandas 行中传递列表不是一个好主意,也许您想使用数组?
  • 该函数 return 没有任何用处,因此您完全不清楚您要实现的目标。在 MVCE 的描述下,您将看到对 "expected outcome" 和 "what went wrong" 的引用。为了获得更多帮助,请更准确。

作为记录,一些答案,虽然我想指出我之前关于这个问题的一般观点

Does dask.delayed use the available clusters to do the computation.

如果您已经为分布式集群创建了客户端,除非您另有说明,否则 dask 将使用它进行计算。

Can I paralleize the for loop iteratition of this pandas DF using delayed, and use multiple computers present in the cluster to do computations.

是的,如果您愿意,您通常可以使用带有 pandas 数据帧的延迟来实现并行性。但是,您的数据框只有一行,因此在这种情况下并不明显 如何 - 这取决于您真正想要实现的目标。

does dask.distributed work on pandas dataframe.

是的,您可以做任何 python 可以用分布式做的事情,因为它只是 python 进程执行代码。它是否为您带来您所追求的性能是一个单独的问题

can we use dask.delayed in dask.distributed.

是的,分布式可以执行 dask 通常可以执行的任何操作,包括延迟 functions/objects

If the above programming approach is wrong, can you guide me whether to choose delayed or dask DF for the above scenario.

不容易,我根本不清楚这是一个数据帧操作。它看起来更像是一个数组 - 但是,我再次注意到您的函数实际上 return 根本没有任何用处。

在教程中:passing pandas dataframes to delayed ; same with dataframe API