dask 分布式数据帧上的慢 len 函数
Slow len function on dask distributed dataframe
我一直在测试如何使用 dask(具有 20 个内核的集群),我对调用 len 函数与通过 loc 切片的速度感到惊讶。
import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')
log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)
#This is the code than runs slowly
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)
print(len(logd))
#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()
知道为什么会这样吗? len 运行得快对我来说并不重要,但我觉得由于不理解这种行为,所以我对库有一些不了解的地方。
所有绿色框对应于 "from_pandas",而在 Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes 的这篇文章中,调用图看起来更好(len_chunk 被调用,速度明显更快,调用不'似乎被锁定并等待一个工人完成他的任务,然后再开始另一个)
好问题,这涉及到数据何时向上移动到集群并返回到客户端(您的 python 会话)的几点。让我们看看您计算的几个阶段
使用 Pandas
加载数据
这是您 python 会话中的 Pandas 数据框,因此它显然仍在您的本地进程中。
log = pd.read_csv('800000test', sep='\t') # on client
转换为懒惰 Dask.dataframe
这会将您的 Pandas 数据帧分解为二十个 Pandas 数据帧,但这些数据帧仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。
logd = dd.from_pandas(log,npartitions=20) # still on client
计算长度
调用 len
实际上会导致此处的计算(通常您会使用 df.some_aggregation().compute()
。所以现在 Dask 开始了。首先它将您的数据移出到集群(慢)然后它调用 len在 20 个分区中(快速),它聚合这些分区(快速),然后将结果向下移动到您的客户端,以便它可以打印。
print(len(logd)) # costly roundtrip client -> cluster -> client
分析
所以这里的问题是我们的 dask.dataframe 仍然在本地 python 会话中拥有所有数据。
使用本地线程调度程序比分布式调度程序要快得多。这应该以毫秒计算
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads
print(len(logd)) # stays on client
但您可能想知道如何扩展到更大的数据集,所以让我们以正确的方式进行。
加载你的工人数据
让 Dask 工作人员加载 csv 文件的位,而不是在您的 client/local 会话中加载 Pandas。这样就不需要 client-worker 通信了。
# log = pd.read_csv('800000test', sep='\t') # on client
log = dd.read_csv('800000test', sep='\t') # on cluster workers
然而,与 pd.read_csv
不同的是,dd.read_csv
是懒惰的,所以这应该 return 几乎立即。我们可以强制 Dask 使用 persist 方法实际进行计算
log = client.persist(log) # triggers computation asynchronously
现在集群开始运行并直接在工作进程中加载您的数据。这个比较快。请注意,此方法 return 会立即在后台进行工作。如果您想等到它完成,请调用 wait
.
from dask.distributed import wait
wait(log) # blocks until read is done
如果您使用小型数据集进行测试并希望获得更多分区,请尝试更改块大小。
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
无论如何,log
上的操作现在应该很快
len(log) # fast
编辑
为了回答关于 this blogpost 的问题,这里是我们对文件所在位置所做的假设。
通常,当您向 dd.read_csv
提供文件名时,它假定该文件对所有工作人员都是可见的。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您需要使用绝对路径(如 /path/to/myfile.*.csv
)或者确保您的工作人员和客户端具有相同的工作目录。
如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。
简单但sub-optimal
简单的方法就是做你最初做的事情,但坚持你的dask.dataframe
log = pd.read_csv('800000test', sep='\t') # on client
logd = dd.from_pandas(log,npartitions=20) # still on client
logd = client.persist(logd) # moves to workers
这很好,但会导致轻微的 less-than-ideal 通信。
复杂但最优
相反,您可以明确地将数据分散到集群中
[future] = client.scatter([log])
虽然这会变得更加复杂 API,所以我将向您指出文档
http://distributed.readthedocs.io/en/latest/manage-computation.html
http://distributed.readthedocs.io/en/latest/memory.html
http://dask.pydata.org/en/latest/delayed-collections.html
我一直在测试如何使用 dask(具有 20 个内核的集群),我对调用 len 函数与通过 loc 切片的速度感到惊讶。
import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')
log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)
#This is the code than runs slowly
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)
print(len(logd))
#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()
知道为什么会这样吗? len 运行得快对我来说并不重要,但我觉得由于不理解这种行为,所以我对库有一些不了解的地方。
所有绿色框对应于 "from_pandas",而在 Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes 的这篇文章中,调用图看起来更好(len_chunk 被调用,速度明显更快,调用不'似乎被锁定并等待一个工人完成他的任务,然后再开始另一个)
好问题,这涉及到数据何时向上移动到集群并返回到客户端(您的 python 会话)的几点。让我们看看您计算的几个阶段
使用 Pandas
加载数据这是您 python 会话中的 Pandas 数据框,因此它显然仍在您的本地进程中。
log = pd.read_csv('800000test', sep='\t') # on client
转换为懒惰 Dask.dataframe
这会将您的 Pandas 数据帧分解为二十个 Pandas 数据帧,但这些数据帧仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。
logd = dd.from_pandas(log,npartitions=20) # still on client
计算长度
调用 len
实际上会导致此处的计算(通常您会使用 df.some_aggregation().compute()
。所以现在 Dask 开始了。首先它将您的数据移出到集群(慢)然后它调用 len在 20 个分区中(快速),它聚合这些分区(快速),然后将结果向下移动到您的客户端,以便它可以打印。
print(len(logd)) # costly roundtrip client -> cluster -> client
分析
所以这里的问题是我们的 dask.dataframe 仍然在本地 python 会话中拥有所有数据。
使用本地线程调度程序比分布式调度程序要快得多。这应该以毫秒计算
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads
print(len(logd)) # stays on client
但您可能想知道如何扩展到更大的数据集,所以让我们以正确的方式进行。
加载你的工人数据
让 Dask 工作人员加载 csv 文件的位,而不是在您的 client/local 会话中加载 Pandas。这样就不需要 client-worker 通信了。
# log = pd.read_csv('800000test', sep='\t') # on client
log = dd.read_csv('800000test', sep='\t') # on cluster workers
然而,与 pd.read_csv
不同的是,dd.read_csv
是懒惰的,所以这应该 return 几乎立即。我们可以强制 Dask 使用 persist 方法实际进行计算
log = client.persist(log) # triggers computation asynchronously
现在集群开始运行并直接在工作进程中加载您的数据。这个比较快。请注意,此方法 return 会立即在后台进行工作。如果您想等到它完成,请调用 wait
.
from dask.distributed import wait
wait(log) # blocks until read is done
如果您使用小型数据集进行测试并希望获得更多分区,请尝试更改块大小。
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
无论如何,log
上的操作现在应该很快
len(log) # fast
编辑
为了回答关于 this blogpost 的问题,这里是我们对文件所在位置所做的假设。
通常,当您向 dd.read_csv
提供文件名时,它假定该文件对所有工作人员都是可见的。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您需要使用绝对路径(如 /path/to/myfile.*.csv
)或者确保您的工作人员和客户端具有相同的工作目录。
如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。
简单但sub-optimal
简单的方法就是做你最初做的事情,但坚持你的dask.dataframe
log = pd.read_csv('800000test', sep='\t') # on client
logd = dd.from_pandas(log,npartitions=20) # still on client
logd = client.persist(logd) # moves to workers
这很好,但会导致轻微的 less-than-ideal 通信。
复杂但最优
相反,您可以明确地将数据分散到集群中
[future] = client.scatter([log])
虽然这会变得更加复杂 API,所以我将向您指出文档
http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html