Python PANDAS:从 pandas/numpy 转换为 dask dataframe/array

Python PANDAS: Converting from pandas/numpy to dask dataframe/array

我正在尝试使用优秀的 dask 库将程序转换为 parallelizable/multithreaded。这是我正在转换的程序:

import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

df_test = df_test.loc[np.repeat(df_test.index, df_test['units'])]
df_test['transaction_dt'] += pd.to_timedelta(df_test.groupby(level=0).cumcount(), unit='d')
df_test = df_test.reset_index(drop=True)

预期结果:

id,transaction_dt,measures
1,2018-01-01,30.5
1,2018-01-02,30.5
1,2018-01-03,30.5
1,2018-01-04,30.5
1,2018-01-03,26.3
1,2018-01-04,26.3
1,2018-01-05,26.3
1,2018-01-06,26.3
2,2018-01-01,12.7
2,2018-01-02,12.7
2,2018-01-03,12.7
2,2018-01-03,8.8
2,2018-01-04,8.8
2,2018-01-05,8.8 

我突然想到,这可能是尝试并行化的一个很好的候选者,因为单独的 dask 分区不需要了解彼此的任何信息即可完成所需的操作。这是我认为它可能如何工作的天真表示:

dd_test = dd.from_pandas(df_test, npartitions=3)

dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] += dd_test.to_timedelta(dd.groupby(level=0).cumcount(), unit='d')
dd_test = dd_test.reset_index(drop=True)

到目前为止,我一直在努力解决以下错误或惯用差异:

  1. "NotImplementedError: Only integer valued repeats supported." 我试图将索引转换为 int column/array 来尝试,但仍然 运行 成为问题。

2. dask不支持变异运算符:"+="

3. 没有 dask .to_timedelta() 参数

4. 没有 dask .cumcount()(但我认为 .cumsum() 可以互换?!)

如果有任何 dask 专家可以让我知道是否存在阻止我尝试此操作的基本障碍或任何实施技巧,那将是一个很大的帮助!

编辑:

自发布问题以来,我认为我在这方面取得了一些进展:

dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1

dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] = dd_test['transaction_dt'] + (dd.test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]') 
dd_test = dd_test.reset_index(drop=True)

但是,我仍然卡在 dask array repeats 错误上。仍然欢迎任何提示。

不确定这是否正是您要查找的内容,但我将 da.repeat 替换为使用 np.repeat,并将 dd_test.indexdd_test['units'] 显式转换为numpy 数组,最后将 dd_test['transaction_dt'].astype('M8[us]') 添加到您的 timedelta 计算中。

df_test = pd.read_csv(StringIO(test_data), sep=',')

dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1

dd_test = dd_test.loc[np.repeat(np.array(dd_test.index), 
np.array(dd_test['units']))]
dd_test['transaction_dt'] = dd_test['transaction_dt'].astype('M8[us]') + (dd_test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]')
dd_test = dd_test.reset_index(drop=True)

df_expected = dd_test.compute()