如何使用 dask 高效地并行化时间序列预测?
How to efficiently parallelize time series forecasting using dask?
我正在尝试使用 dask 在 python 中并行化时间序列预测。数据的格式是每个时间序列都是一列,它们有一个共同的每月日期索引。我有一个自定义预测函数,它 return 是一个具有拟合值和预测值的时间序列对象。我想将此功能应用于数据框的所有列(所有时间序列)和 return 一个包含所有这些系列的新数据框,以便上传到数据库。
我已经通过 运行:
获得了代码
data = pandas_df.copy()
ddata = dd.from_pandas(data, npartitions=1)
res = ddata.map_partitions(lambda df: df.apply(forecast_func,
axis=0)).compute(get=dask.multiprocessing.get)
我的问题是,Dask 中是否有一种方法可以按列而不是按行进行分区,因为在这个用例中,我需要保持有序时间索引,以便预测功能正常工作。
如果不是,我将如何重新格式化数据以允许高效的大规模预测成为可能,并且仍然return我需要的格式的数据然后推送到数据库?
example of data format
Dask dataframe 仅按行对数据进行分区。见 Dask dataframe documentation
Dask array 但是可以沿任何维度进行分区。尽管您使用了 Numpy 语义而不是 Pandas 语义。
你可以做任何你想做的事情 dask delayed or futures. This parallel computing example 更通用的教程可能会给你一些想法。
感谢您的帮助,我真的很感激。我已经使用了 dask.delayed 解决方案并且它工作得非常好,只需要使用本地集群大约 1/3 的时间。
任何对我实施的解决方案感兴趣的人:
from dask.distributed import Client, LocalCluster
import pandas as pd
import dask
cluster = LocalCluster(n_workers=3,ncores=3)
client = Client(cluster)
#get list of time series back
output = []
for i in small_df:
forecasted_series = dask.delayed(custom_forecast_func)(small_df[i])
output.append(forecasted_series)
total = dask.delayed(output).compute()
#combine list of series into 1 dataframe
full_df = pd.concat(total,ignore_index=False,keys=small_df.columns,names=['time_series_names','Date'])
final_df = full_df.to_frame().reset_index()
final_df.columns = ['time_series_names','Date','value_variable']
final_df.head()
这为您提供了融化的数据框结构,因此如果您希望该系列成为列,您可以使用
对其进行转换
pivoted_df = final_df.pivot(index='Date', columns='time_series_names', values='value_variable')
small_df is in this format in pandas dataframe with Date being the index
我正在尝试使用 dask 在 python 中并行化时间序列预测。数据的格式是每个时间序列都是一列,它们有一个共同的每月日期索引。我有一个自定义预测函数,它 return 是一个具有拟合值和预测值的时间序列对象。我想将此功能应用于数据框的所有列(所有时间序列)和 return 一个包含所有这些系列的新数据框,以便上传到数据库。 我已经通过 运行:
获得了代码data = pandas_df.copy()
ddata = dd.from_pandas(data, npartitions=1)
res = ddata.map_partitions(lambda df: df.apply(forecast_func,
axis=0)).compute(get=dask.multiprocessing.get)
我的问题是,Dask 中是否有一种方法可以按列而不是按行进行分区,因为在这个用例中,我需要保持有序时间索引,以便预测功能正常工作。
如果不是,我将如何重新格式化数据以允许高效的大规模预测成为可能,并且仍然return我需要的格式的数据然后推送到数据库?
example of data format
Dask dataframe 仅按行对数据进行分区。见 Dask dataframe documentation
Dask array 但是可以沿任何维度进行分区。尽管您使用了 Numpy 语义而不是 Pandas 语义。
你可以做任何你想做的事情 dask delayed or futures. This parallel computing example 更通用的教程可能会给你一些想法。
感谢您的帮助,我真的很感激。我已经使用了 dask.delayed 解决方案并且它工作得非常好,只需要使用本地集群大约 1/3 的时间。
任何对我实施的解决方案感兴趣的人:
from dask.distributed import Client, LocalCluster
import pandas as pd
import dask
cluster = LocalCluster(n_workers=3,ncores=3)
client = Client(cluster)
#get list of time series back
output = []
for i in small_df:
forecasted_series = dask.delayed(custom_forecast_func)(small_df[i])
output.append(forecasted_series)
total = dask.delayed(output).compute()
#combine list of series into 1 dataframe
full_df = pd.concat(total,ignore_index=False,keys=small_df.columns,names=['time_series_names','Date'])
final_df = full_df.to_frame().reset_index()
final_df.columns = ['time_series_names','Date','value_variable']
final_df.head()
这为您提供了融化的数据框结构,因此如果您希望该系列成为列,您可以使用
对其进行转换pivoted_df = final_df.pivot(index='Date', columns='time_series_names', values='value_variable')
small_df is in this format in pandas dataframe with Date being the index