dask 分布式数据帧上的慢 len 函数

Slow len function on dask distributed dataframe

我一直在测试如何使用 dask(具有 20 个内核的集群),我对调用 len 函数与通过 loc 切片的速度感到惊讶。

import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')

log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)

#This is the code than runs slowly 
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)

print(len(logd))

#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()

知道为什么会这样吗? len 运行得快对我来说并不重要,但我觉得由于不理解这种行为,所以我对库有一些不了解的地方。

所有绿色框对应于 "from_pandas",而在 Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes 的这篇文章中,调用图看起来更好(len_chunk 被调用,速度明显更快,调用不'似乎被锁定并等待一个工人完成他的任务,然后再开始另一个)

好问题,这涉及到数据何时向上移动到集群并返回到客户端(您的 python 会话)的几点。让我们看看您计算的几个阶段

使用 Pandas

加载数据

这是您 python 会话中的 Pandas 数据框,因此它显然仍在您的本地进程中。

log = pd.read_csv('800000test', sep='\t')  # on client

转换为懒惰 Dask.dataframe

这会将您的 Pandas 数据帧分解为二十个 Pandas 数据帧,但这些数据帧仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。

logd = dd.from_pandas(log,npartitions=20)  # still on client

计算长度

调用 len 实际上会导致此处的计算(通常您会使用 df.some_aggregation().compute()。所以现在 Dask 开始了。首先它将您的数据移出到集群(慢)然后它调用 len在 20 个分区中(快速),它聚合这些分区(快速),然后将结果向下移动到您的客户端,以便它可以打印。

print(len(logd))  # costly roundtrip client -> cluster -> client

分析

所以这里的问题是我们的 dask.dataframe 仍然在本地 python 会话中拥有所有数据。

使用本地线程调度程序比分布式调度程序要快得多。这应该以毫秒计算

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
    print(len(logd))  # stays on client

但您可能想知道如何扩展到更大的数据集,所以让我们以正确的方式进行。

加载你的工人数据

让 Dask 工作人员加载 csv 文件的位,而不是在您的 client/local 会话中加载 Pandas。这样就不需要 client-worker 通信了。

# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t')    # on cluster workers

然而,与 pd.read_csv 不同的是,dd.read_csv 是懒惰的,所以这应该 return 几乎立即。我们可以强制 Dask 使用 persist 方法实际进行计算

log = client.persist(log)  # triggers computation asynchronously

现在集群开始运行并直接在工作进程中加载​​您的数据。这个比较快。请注意,此方法 return 会立即在后台进行工作。如果您想等到它完成,请调用 wait.

from dask.distributed import wait
wait(log)  # blocks until read is done

如果您使用小型数据集进行测试并希望获得更多分区,请尝试更改块大小。

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks

无论如何,log 上的操作现在应该很快

len(log)  # fast

编辑

为了回答关于 this blogpost 的问题,这里是我们对文件所在位置所做的假设。

通常,当您向 dd.read_csv 提供文件名时,它假定该文件对所有工作人员都是可见的。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您需要使用绝对路径(如 /path/to/myfile.*.csv)或者确保您的工作人员和客户端具有相同的工作目录。

如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。

简单但sub-optimal

简单的方法就是做你最初做的事情,但坚持你的dask.dataframe

log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20)  # still on client
logd = client.persist(logd)  # moves to workers

这很好,但会导致轻微的 less-than-ideal 通信。

复杂但最优

相反,您可以明确地将数据分散到集群中

[future] = client.scatter([log])

虽然这会变得更加复杂 API,所以我将向您指出文档

http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html