我可以加速这个简单的 Dask 脚本以获得跨多个 Feather 数据帧的总行数吗?
Can I speed up this simple Dask script to get a total row count across multiple Feather dataframes?
我在C:\script\data\YYYY\MM\data.feather
中有数据
为了更好地理解 Dask,我正在尝试优化一个简单的脚本,该脚本从每个文件中获取行数并将它们相加。 200 个文件中有近 1 亿行。
import dask.dataframe as dd
import feather
from dask.distributed import Client,LocalCluster
from dask import delayed
counts = []
with LocalCluster() as cluster, Client(cluster) as client:
for f in dates:
df = delayed(feather.read_feather)(f'data\{f.year}\{f.month:02}\data.feather',columns=['colA','colB'])
counts.append(df.shape[0])
tot = sum(counts)
print(dd.compute(tot))
我在 read_feather 中包含了 colA 和 colB,因为我想最终转向计算不同时间跨度内的不同值。我在任务流中看到的是 read_feather 每次大约 2 秒时触发,以及一些 disk-write-read_feather
和 disk-read-getattr
每次触发 2-4 秒。如果我 运行 read_feather 在它自己的单个文件脚本中,大约需要 0.4 秒才能完成。 read_feather 的额外时间是不是因为并行?另外, disk-*
任务是否意味着它正在读取文件,然后写入磁盘,然后再次读取?我原以为 Dask 会读入文件,计算行数,然后存储该数字以供稍后求和。
disk-write-read_feather
和 disk-read-getattr
的出现(在您的仪表板视图中应该是红色的)表明数据帧在内存中很大并且确实被缓存到磁盘 - 这在这方面完全没有帮助案例.
当你这样做时
df = ...
df.shape[0]
这会为每个分区创建一个包含三个延迟调用(即任务)的链,一个用于加载,一个用于 select 属性,一个用于 select 索引。这些调用似乎没有被融合成一个,而且你有很大的中间结果。
相反,您可以确保合并调用,并且不会产生大的结果:
@dask.delayed
def get_size(f):
df = feather.read_feather(f'data\{f.year}\{f.month:02}\data.feather',columns=['colA','colB'])
return df.shape[0]
counts = [get_size(f) for f in dates]
但是,我不知道加载羽化是否可以很好地处理线程,或者您是否只是在最大化 IO。您的仪表板和系统监控工具可能会告诉您更多信息。
我在C:\script\data\YYYY\MM\data.feather
为了更好地理解 Dask,我正在尝试优化一个简单的脚本,该脚本从每个文件中获取行数并将它们相加。 200 个文件中有近 1 亿行。
import dask.dataframe as dd
import feather
from dask.distributed import Client,LocalCluster
from dask import delayed
counts = []
with LocalCluster() as cluster, Client(cluster) as client:
for f in dates:
df = delayed(feather.read_feather)(f'data\{f.year}\{f.month:02}\data.feather',columns=['colA','colB'])
counts.append(df.shape[0])
tot = sum(counts)
print(dd.compute(tot))
我在 read_feather 中包含了 colA 和 colB,因为我想最终转向计算不同时间跨度内的不同值。我在任务流中看到的是 read_feather 每次大约 2 秒时触发,以及一些 disk-write-read_feather
和 disk-read-getattr
每次触发 2-4 秒。如果我 运行 read_feather 在它自己的单个文件脚本中,大约需要 0.4 秒才能完成。 read_feather 的额外时间是不是因为并行?另外, disk-*
任务是否意味着它正在读取文件,然后写入磁盘,然后再次读取?我原以为 Dask 会读入文件,计算行数,然后存储该数字以供稍后求和。
disk-write-read_feather
和 disk-read-getattr
的出现(在您的仪表板视图中应该是红色的)表明数据帧在内存中很大并且确实被缓存到磁盘 - 这在这方面完全没有帮助案例.
当你这样做时
df = ...
df.shape[0]
这会为每个分区创建一个包含三个延迟调用(即任务)的链,一个用于加载,一个用于 select 属性,一个用于 select 索引。这些调用似乎没有被融合成一个,而且你有很大的中间结果。
相反,您可以确保合并调用,并且不会产生大的结果:
@dask.delayed
def get_size(f):
df = feather.read_feather(f'data\{f.year}\{f.month:02}\data.feather',columns=['colA','colB'])
return df.shape[0]
counts = [get_size(f) for f in dates]
但是,我不知道加载羽化是否可以很好地处理线程,或者您是否只是在最大化 IO。您的仪表板和系统监控工具可能会告诉您更多信息。