在 Dask 中重用中间结果(混合延迟和 dask.dataframe)
Re-using intermediate results in Dask (mixing delayed and dask.dataframe)
根据我在 收到的答复,我编写了一个 ETL 过程,如下所示:
import pandas as pd
from dask import delayed
from dask import dataframe as dd
def preprocess_files(filename):
"""Reads file, collects metadata and identifies lines not containing data.
"""
...
return filename, metadata, skiprows
def load_file(filename, skiprows):
"""Loads the file into a pandas dataframe, skipping lines not containing data."""
return df
def process_errors(filename, skiplines):
"""Calculates error metrics based on the information
collected in the pre-processing step
"""
...
def process_metadata(filename, metadata):
"""Analyses metadata collected in the pre-processing step."""
...
values = [delayed(preprocess_files)(fn) for fn in file_names]
filenames = [value[0] for value in values]
metadata = [value[1] for value in values]
skiprows = [value[2] for value in values]
error_results = [delayed(process_errors)(arg[0], arg[1])
for arg in zip(filenames, skiprows)]
meta_results = [delayed(process_metadata)(arg[0], arg[1])
for arg in zip(filenames, metadata)]
dfs = [delayed(load_file)(arg[0], arg[1])
for arg in zip(filenames, skiprows)]
... # several delayed transformations defined on individual dataframes
# finally: categorize several dataframe columns and write them to HDF5
dfs = dd.from_delayed(dfs, meta=metaframe)
dfs.categorize(columns=[...]) # I would like to delay this
dfs.to_hdf(hdf_file_name, '/data',...) # I would also like to delay this
all_operations = error_results + meta_results # + delayed operations on dask dataframe
# trigger all computation at once,
# allow re-using of data collected in the pre-processing step.
dask.compute(*all_operations)
ETL 过程经历几个步骤:
- 预处理文件,识别不包含任何相关数据的行并解析元数据
- 使用收集到的信息、处理错误信息、元数据并将数据行并行加载到 pandas 数据帧中(重新使用预处理步骤的结果)。操作 (
process_metadata
、process_errors
、load_file
) 具有共享数据依赖性,因为它们都使用在预处理步骤中收集的信息。理想情况下,预处理步骤只会 运行 一次,并且跨进程共享结果。
- 最终,将 pandas 数据帧收集到一个 dask 数据帧中,将它们分类并写入 hdf。
我遇到的问题是,categorize
和 to_hdf
立即触发计算,丢弃元数据和错误数据,否则 process_errors
和 process_metadata
。
有人告诉我延迟 dask.dataframes
上的操作会导致问题,这就是为什么我很想知道是否有可能触发整个计算(处理元数据、处理错误、加载数据帧) , 转换数据帧并将它们存储为 HDF 格式),允许不同的进程共享在预处理阶段收集的数据。
有两种方法可以解决您的问题:
- 延迟一切
- 分阶段计算
延迟一切
to_hdf 调用接受一个 compute=
关键字参数,您可以将其设置为 False。如果为 False,它将返回一个 dask.delayed
值,您可以随时计算该值。
然而,如果您想继续使用 dask.dataframe,则需要立即计算分类调用。如果不立即或多或少地检查数据,我们无法创建一致的 dask.dataframe。 Pandas 最近围绕联合分类的改进将使我们能够在未来改变这一点,但现在你被困住了。如果这对您来说是一个障碍,那么您将不得不切换到 dask.delayed
并使用 df.to_delayed()
手动处理一些事情
分阶段计算
如果使用distributed scheduler you can stage your computation by using the .persist
method。
from dask.distributed import Executor
e = Executor() # make a local "cluster" on your laptop
delayed_values = e.persist(*delayed_values)
... define further computations on delayed values ...
results = dask.compute(results) # compute as normal
这将让您触发一些计算,并且仍然让您继续定义您的计算。您坚持的价值观将保留在内存中。
根据我在
import pandas as pd
from dask import delayed
from dask import dataframe as dd
def preprocess_files(filename):
"""Reads file, collects metadata and identifies lines not containing data.
"""
...
return filename, metadata, skiprows
def load_file(filename, skiprows):
"""Loads the file into a pandas dataframe, skipping lines not containing data."""
return df
def process_errors(filename, skiplines):
"""Calculates error metrics based on the information
collected in the pre-processing step
"""
...
def process_metadata(filename, metadata):
"""Analyses metadata collected in the pre-processing step."""
...
values = [delayed(preprocess_files)(fn) for fn in file_names]
filenames = [value[0] for value in values]
metadata = [value[1] for value in values]
skiprows = [value[2] for value in values]
error_results = [delayed(process_errors)(arg[0], arg[1])
for arg in zip(filenames, skiprows)]
meta_results = [delayed(process_metadata)(arg[0], arg[1])
for arg in zip(filenames, metadata)]
dfs = [delayed(load_file)(arg[0], arg[1])
for arg in zip(filenames, skiprows)]
... # several delayed transformations defined on individual dataframes
# finally: categorize several dataframe columns and write them to HDF5
dfs = dd.from_delayed(dfs, meta=metaframe)
dfs.categorize(columns=[...]) # I would like to delay this
dfs.to_hdf(hdf_file_name, '/data',...) # I would also like to delay this
all_operations = error_results + meta_results # + delayed operations on dask dataframe
# trigger all computation at once,
# allow re-using of data collected in the pre-processing step.
dask.compute(*all_operations)
ETL 过程经历几个步骤:
- 预处理文件,识别不包含任何相关数据的行并解析元数据
- 使用收集到的信息、处理错误信息、元数据并将数据行并行加载到 pandas 数据帧中(重新使用预处理步骤的结果)。操作 (
process_metadata
、process_errors
、load_file
) 具有共享数据依赖性,因为它们都使用在预处理步骤中收集的信息。理想情况下,预处理步骤只会 运行 一次,并且跨进程共享结果。 - 最终,将 pandas 数据帧收集到一个 dask 数据帧中,将它们分类并写入 hdf。
我遇到的问题是,categorize
和 to_hdf
立即触发计算,丢弃元数据和错误数据,否则 process_errors
和 process_metadata
。
有人告诉我延迟 dask.dataframes
上的操作会导致问题,这就是为什么我很想知道是否有可能触发整个计算(处理元数据、处理错误、加载数据帧) , 转换数据帧并将它们存储为 HDF 格式),允许不同的进程共享在预处理阶段收集的数据。
有两种方法可以解决您的问题:
- 延迟一切
- 分阶段计算
延迟一切
to_hdf 调用接受一个 compute=
关键字参数,您可以将其设置为 False。如果为 False,它将返回一个 dask.delayed
值,您可以随时计算该值。
然而,如果您想继续使用 dask.dataframe,则需要立即计算分类调用。如果不立即或多或少地检查数据,我们无法创建一致的 dask.dataframe。 Pandas 最近围绕联合分类的改进将使我们能够在未来改变这一点,但现在你被困住了。如果这对您来说是一个障碍,那么您将不得不切换到 dask.delayed
并使用 df.to_delayed()
分阶段计算
如果使用distributed scheduler you can stage your computation by using the .persist
method。
from dask.distributed import Executor
e = Executor() # make a local "cluster" on your laptop
delayed_values = e.persist(*delayed_values)
... define further computations on delayed values ...
results = dask.compute(results) # compute as normal
这将让您触发一些计算,并且仍然让您继续定义您的计算。您坚持的价值观将保留在内存中。