如何在 Numpy Busday_count 中使用 Dask DataFrame?
How to use Dask DataFrame with Numpy Busday_count?
我正在努力将 Pandas/Numpy 代码转换为 Dask 以处理更大的数据集。我似乎无法重新创建以下 Pandas/Numpy 代码:
df['days_to_complete'] = np.busday_count(begindates=df['time_order_date'].values.astype('datetime64[D]'),enddates=df['time_complete_date'],weekmask='1111111',holidays=hols_list)
这 returns 是 time_order_date 和 time_complete_date 之间的整数天数,同时考虑工作周和假期列表。它在我的数据框中创建并填充了一个新列,没问题。
在 Dask 中,我尝试了以下方法:
map_partitions调用numpy函数:
ddf['days_to_complete'] = ddf.time_order.map_partitions(func=np.busday_count,args= ddf['time_order_date'].values.astype('datetime64[D]'),ddf['time_complete_date']),meta=(None, 'i8'))
也map_partitions 使用 lambda:
ddf['days_to_complete'] = ddf.map_partitions(lambda ddf: ddf.assign(result = np.busday_count(begindates=ddf['time_order_date'].values.astype('datetime64[D]'),enddates=ddf['time_complete_date'],weekmask='1111111',holidays=hols_list)),meta=(None,'i8'))
并在运行 ddf.compute() 后得到以下错误:
TypeError: busday_count() got multiple values for argument 'begindates'
如何以并行处理/Dask 友好的方式最好地使用此 numpy 函数?
我没有成功使用 Dask docs/examples 或其他 SO 线程。
我还想使用 Pandas CustomBusinessHour rollfoward,就像我在此处的基本 pandas 工作一样:
bis_hour = CustomBusinessHour(n=1,weekmask='Mon Tue Wed Thu Fri Sat Sun',holidays=hols_list,start = bus_hours_start,end = bus_hours_end,offset=0)
df['time_order_bis'] = pd.to_datetime(df['time_order'])
df['time_order_bis'] = df['time_order_bis'].apply(lambda row: bis_hour.rollforward(row))
此 'rolls forward' 订单时间必须在定义的客户营业时间内(周六订单现在是周一早上 7 点,工作日)。谢谢!
编辑:
我试过编写和调用一个函数:
def bdays(df):
return np.busday_count(df.time_order_date.values.astype('datetime64[D]'),df.time_complete_date,weekmask='1111111',holidays=hols_list)
ddf['days_to_complete'] = ddf.map_partitions(bdays,df=ddf,meta=('days_to_complete','i8')).compute()
我收到以下错误:TypeError: bdays() got multiple values for argument 'df'
我成功了!关键是 return 一个 Dask 数组,不要过早地计算东西,这会破坏类型。我建议做大量的 type() 检查并逐步进行,本质上你想要 Dask 对象,pandas objects/numpy 数组可以打破 partitioning/parallelism.
函数:
def bdays(df=ddf):
return da.from_array(np.busday_count(df.time_order_date,df.time_complete_date,weekmask='1111111',holidays=hols_list))
使用map_partitions。请注意上面函数的第一个参数需要 dataframe/partition -> 我们没有在映射分区中指定它!仅附加参数。
ddf['days_to_complete'] = ddf.map_partitions(bdays,meta=('days_to_complete','i8'))
在分配给我的数据框中的新列之前计算 (compute()) 导致错误。
TypeError: set_index() missing 1 required positional argument: 'other'
调试建议:
测试您的输入并仅使用一个分区测试功能。 bdays 是上面的函数。
type(ddf.map_partitions(bdays,meta='i8'))
output: dask.dataframe.core.Series
我正在努力将 Pandas/Numpy 代码转换为 Dask 以处理更大的数据集。我似乎无法重新创建以下 Pandas/Numpy 代码:
df['days_to_complete'] = np.busday_count(begindates=df['time_order_date'].values.astype('datetime64[D]'),enddates=df['time_complete_date'],weekmask='1111111',holidays=hols_list)
这 returns 是 time_order_date 和 time_complete_date 之间的整数天数,同时考虑工作周和假期列表。它在我的数据框中创建并填充了一个新列,没问题。
在 Dask 中,我尝试了以下方法:
map_partitions调用numpy函数:
ddf['days_to_complete'] = ddf.time_order.map_partitions(func=np.busday_count,args= ddf['time_order_date'].values.astype('datetime64[D]'),ddf['time_complete_date']),meta=(None, 'i8'))
也map_partitions 使用 lambda:
ddf['days_to_complete'] = ddf.map_partitions(lambda ddf: ddf.assign(result = np.busday_count(begindates=ddf['time_order_date'].values.astype('datetime64[D]'),enddates=ddf['time_complete_date'],weekmask='1111111',holidays=hols_list)),meta=(None,'i8'))
并在运行 ddf.compute() 后得到以下错误:
TypeError: busday_count() got multiple values for argument 'begindates'
如何以并行处理/Dask 友好的方式最好地使用此 numpy 函数? 我没有成功使用 Dask docs/examples 或其他 SO 线程。 我还想使用 Pandas CustomBusinessHour rollfoward,就像我在此处的基本 pandas 工作一样:
bis_hour = CustomBusinessHour(n=1,weekmask='Mon Tue Wed Thu Fri Sat Sun',holidays=hols_list,start = bus_hours_start,end = bus_hours_end,offset=0)
df['time_order_bis'] = pd.to_datetime(df['time_order'])
df['time_order_bis'] = df['time_order_bis'].apply(lambda row: bis_hour.rollforward(row))
此 'rolls forward' 订单时间必须在定义的客户营业时间内(周六订单现在是周一早上 7 点,工作日)。谢谢!
编辑: 我试过编写和调用一个函数:
def bdays(df):
return np.busday_count(df.time_order_date.values.astype('datetime64[D]'),df.time_complete_date,weekmask='1111111',holidays=hols_list)
ddf['days_to_complete'] = ddf.map_partitions(bdays,df=ddf,meta=('days_to_complete','i8')).compute()
我收到以下错误:TypeError: bdays() got multiple values for argument 'df'
我成功了!关键是 return 一个 Dask 数组,不要过早地计算东西,这会破坏类型。我建议做大量的 type() 检查并逐步进行,本质上你想要 Dask 对象,pandas objects/numpy 数组可以打破 partitioning/parallelism.
函数:
def bdays(df=ddf):
return da.from_array(np.busday_count(df.time_order_date,df.time_complete_date,weekmask='1111111',holidays=hols_list))
使用map_partitions。请注意上面函数的第一个参数需要 dataframe/partition -> 我们没有在映射分区中指定它!仅附加参数。
ddf['days_to_complete'] = ddf.map_partitions(bdays,meta=('days_to_complete','i8'))
在分配给我的数据框中的新列之前计算 (compute()) 导致错误。
TypeError: set_index() missing 1 required positional argument: 'other'
调试建议: 测试您的输入并仅使用一个分区测试功能。 bdays 是上面的函数。
type(ddf.map_partitions(bdays,meta='i8'))
output: dask.dataframe.core.Series