Dask DataFrame:对具有多行的 groupby 对象重新采样
Dask DataFrame: Resample over groupby object with multiple rows
我从 Castra 创建了以下 dask 数据框:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
产量:
user_id / ts / text
ts
2015-08-08 01:10:00 9235 2015-08-08 01:10:00 a
2015-08-08 02:20:00 2353 2015-08-08 02:20:00 b
2015-08-08 02:20:00 9235 2015-08-08 02:20:00 c
2015-08-08 04:10:00 9235 2015-08-08 04:10:00 d
2015-08-08 08:10:00 2353 2015-08-08 08:10:00 e
我想做的是:
- 按
user_id
和 ts
分组
- 在 3 小时内重新采样
- 在重采样步骤中,任何合并的行都应连接文本
示例输出:
text
user_id ts
9235 2015-08-08 00:00:00 ac
2015-08-08 03:00:00 d
2353 2015-08-08 00:00:00 b
2015-08-08 06:00:00 e
我尝试了以下方法:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()
并出现以下错误:
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex
我尝试在管道中传递 set_index('ts')
,但它似乎不是 Series
的属性。
关于如何实现这一点有什么想法吗?
TL;DR
如果这能让问题更简单,我也可以更改我创建的 Castra DB 的格式。我目前的实现主要取自 this 伟大的 post.
我设置索引(在to_df()
函数中)如下:
df.set_index('ts',drop=False,inplace=True)
并且拥有:
with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
batches = partition_all(batch_size, f)
df, frames = peek(map(self.to_df, batches))
castra = Castra(S.CASTRA, template=df, categories=categories)
castra.extend_sequence(frames, freq='3h')
这里是生成的数据类型:
ts datetime64[ns]
text object
user_id float64
尝试将您的索引转换为 DatetimeIndex,如下所示:
import datetime
# ...
df.index = dd.DatetimeIndex(df.index.map(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S')))
# ...
如果我们可以假设每个 user-id
组都可以放入内存,那么我建议使用 dask.dataframe 进行外部分组,然后使用 pandas 在每个组中进行操作组,如下所示。
def per_group(blk):
return blk.groupby('ts').text.resample('3H', how='sum')
df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()
这将两个困难的事情分离到两个不同的项目中
- dask.dataframe
将所有用户 ID 一起洗牌到正确的组中
- 在每个组内进行复杂的日期时间重采样由 pandas 明确处理。
理想情况下 dask.dataframe 会自动为您编写每组函数。目前 dask.dataframe 不智能处理多索引,或在多列 groupbys 之上重采样,因此自动解决方案尚不可用。不过,很有可能退回到 pandas 进行每块计算,同时仍然使用 dask.dataframe 相应地准备组。
我从 Castra 创建了以下 dask 数据框:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
产量:
user_id / ts / text
ts
2015-08-08 01:10:00 9235 2015-08-08 01:10:00 a
2015-08-08 02:20:00 2353 2015-08-08 02:20:00 b
2015-08-08 02:20:00 9235 2015-08-08 02:20:00 c
2015-08-08 04:10:00 9235 2015-08-08 04:10:00 d
2015-08-08 08:10:00 2353 2015-08-08 08:10:00 e
我想做的是:
- 按
user_id
和ts
分组
- 在 3 小时内重新采样
- 在重采样步骤中,任何合并的行都应连接文本
示例输出:
text
user_id ts
9235 2015-08-08 00:00:00 ac
2015-08-08 03:00:00 d
2353 2015-08-08 00:00:00 b
2015-08-08 06:00:00 e
我尝试了以下方法:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()
并出现以下错误:
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex
我尝试在管道中传递 set_index('ts')
,但它似乎不是 Series
的属性。
关于如何实现这一点有什么想法吗?
TL;DR
如果这能让问题更简单,我也可以更改我创建的 Castra DB 的格式。我目前的实现主要取自 this 伟大的 post.
我设置索引(在to_df()
函数中)如下:
df.set_index('ts',drop=False,inplace=True)
并且拥有:
with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
batches = partition_all(batch_size, f)
df, frames = peek(map(self.to_df, batches))
castra = Castra(S.CASTRA, template=df, categories=categories)
castra.extend_sequence(frames, freq='3h')
这里是生成的数据类型:
ts datetime64[ns]
text object
user_id float64
尝试将您的索引转换为 DatetimeIndex,如下所示:
import datetime
# ...
df.index = dd.DatetimeIndex(df.index.map(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S')))
# ...
如果我们可以假设每个 user-id
组都可以放入内存,那么我建议使用 dask.dataframe 进行外部分组,然后使用 pandas 在每个组中进行操作组,如下所示。
def per_group(blk):
return blk.groupby('ts').text.resample('3H', how='sum')
df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()
这将两个困难的事情分离到两个不同的项目中
- dask.dataframe 将所有用户 ID 一起洗牌到正确的组中
- 在每个组内进行复杂的日期时间重采样由 pandas 明确处理。
理想情况下 dask.dataframe 会自动为您编写每组函数。目前 dask.dataframe 不智能处理多索引,或在多列 groupbys 之上重采样,因此自动解决方案尚不可用。不过,很有可能退回到 pandas 进行每块计算,同时仍然使用 dask.dataframe 相应地准备组。