如何将 python dask 的输出(来自 xarray)保存到 pandas 数据帧中
How to save outputs (from xarray) from python dask delayed into a pandas dataframe
我对尝试并行化我的 python 代码还很陌生。我正在尝试对 xarray 执行一些分析,然后用结果填充 pandas 数据框。数据框的列是独立的,所以我认为使用 dask delayed 进行并行化应该是微不足道的,但无法弄清楚如何。我的 xarrays 很大,所以这个循环需要一段时间,而且内存很大。它也可以按时间分块,如果这样更容易的话(这可能有助于记忆)!
这是未并行化的版本:
from time import sleep
import time
import pandas as pd
import dask.dataframe as dd
data1 = np.random.rand(4, 3,3)
data2=np.random.randint(4,size=(3,3))
locs1 = ["IA", "IL", "IN"]
locs2 = ['a', 'b', 'c']
times = pd.date_range("2000-01-01", periods=4)
xarray1 = xr.DataArray(data1, coords=[times, locs1, locs2], dims=["time", "space1", "space2"])
xarray2= xr.DataArray(data2, coords=[locs1, locs2], dims=[ "space1", "space2"])
def delayed_where(xarray1,xarray2,id):
sleep(1)
return xarray1.where(xarray2==id).mean(axis=(1,2)).to_dataframe(id)
final_df=pd.DataFrame(columns=range(4),index=times)
for column in final_df:
final_df[column]=delayed_where(xarray1,xarray2,column)
我想并行化 for 循环,但已经尝试过:
final_df_delayed=pd.DataFrame(columns=range(4),index=times)
for column in final_df:
final_df_delayed[column]=delayed(delayed_where)(xarray1,xarray2,column)
final_df.compute()
或者可能有延迟的事情?
final_df_dd=dd.from_pandas(final_df, npartitions=2)
for column in final_df:
final_df_dd[column]=delayed(delayed_where)(xarray1,xarray2,column)
final_df_dd.compute()
但是 none 这些工作。有人可以帮忙吗?
您正确使用了 delayed,但无法按照您指定的方式构建 dask 数据帧。
from dask import delayed
import dask
@delayed
def delayed_where(xarray1,xarray2,id):
sleep(1)
return xarray1.where(xarray2==id).mean(axis=(1,2)).to_dataframe(id)
@delayed
def form_df(list_col_results):
final_df=pd.DataFrame(columns=range(4),index=times)
for n, column in enumerate(final_df):
final_df[column]=list_col_results[n]
return final_df
delayed_cols = [delayed_where(xarray1,xarray2, col) for col in final_df.columns]
delayed_df = form_df(delayed_cols)
delayed_df.compute()
请注意,枚举是获得列的正确顺序的笨拙方法,但您的实际问题可能会引导您找到更好的指定方法(例如,通过将每一列明确指定为单独的参数)。
我对尝试并行化我的 python 代码还很陌生。我正在尝试对 xarray 执行一些分析,然后用结果填充 pandas 数据框。数据框的列是独立的,所以我认为使用 dask delayed 进行并行化应该是微不足道的,但无法弄清楚如何。我的 xarrays 很大,所以这个循环需要一段时间,而且内存很大。它也可以按时间分块,如果这样更容易的话(这可能有助于记忆)!
这是未并行化的版本:
from time import sleep
import time
import pandas as pd
import dask.dataframe as dd
data1 = np.random.rand(4, 3,3)
data2=np.random.randint(4,size=(3,3))
locs1 = ["IA", "IL", "IN"]
locs2 = ['a', 'b', 'c']
times = pd.date_range("2000-01-01", periods=4)
xarray1 = xr.DataArray(data1, coords=[times, locs1, locs2], dims=["time", "space1", "space2"])
xarray2= xr.DataArray(data2, coords=[locs1, locs2], dims=[ "space1", "space2"])
def delayed_where(xarray1,xarray2,id):
sleep(1)
return xarray1.where(xarray2==id).mean(axis=(1,2)).to_dataframe(id)
final_df=pd.DataFrame(columns=range(4),index=times)
for column in final_df:
final_df[column]=delayed_where(xarray1,xarray2,column)
我想并行化 for 循环,但已经尝试过:
final_df_delayed=pd.DataFrame(columns=range(4),index=times)
for column in final_df:
final_df_delayed[column]=delayed(delayed_where)(xarray1,xarray2,column)
final_df.compute()
或者可能有延迟的事情?
final_df_dd=dd.from_pandas(final_df, npartitions=2)
for column in final_df:
final_df_dd[column]=delayed(delayed_where)(xarray1,xarray2,column)
final_df_dd.compute()
但是 none 这些工作。有人可以帮忙吗?
您正确使用了 delayed,但无法按照您指定的方式构建 dask 数据帧。
from dask import delayed
import dask
@delayed
def delayed_where(xarray1,xarray2,id):
sleep(1)
return xarray1.where(xarray2==id).mean(axis=(1,2)).to_dataframe(id)
@delayed
def form_df(list_col_results):
final_df=pd.DataFrame(columns=range(4),index=times)
for n, column in enumerate(final_df):
final_df[column]=list_col_results[n]
return final_df
delayed_cols = [delayed_where(xarray1,xarray2, col) for col in final_df.columns]
delayed_df = form_df(delayed_cols)
delayed_df.compute()
请注意,枚举是获得列的正确顺序的笨拙方法,但您的实际问题可能会引导您找到更好的指定方法(例如,通过将每一列明确指定为单独的参数)。