聚合 Dask 数据框并生成聚合数据框
Aggregate a Dask dataframe and produce a dataframe of aggregates
我有一个如下所示的 Dask 数据框:
url referrer session_id ts customer
url1 ref1 xxx 2017-09-15 00:00:00 a.com
url2 ref2 yyy 2017-09-15 00:00:00 a.com
url2 ref3 yyy 2017-09-15 00:00:00 a.com
url1 ref1 xxx 2017-09-15 01:00:00 a.com
url2 ref2 yyy 2017-09-15 01:00:00 a.com
我想根据 url 和时间戳对数据进行分组,聚合列值并生成一个如下所示的数据框:
customer url ts page_views visitors referrers
a.com url1 2017-09-15 00:00:00 1 1 [ref1]
a.com url2 2017-09-15 00:00:00 2 2 [ref2, ref3]
在 Spark SQL 中,我可以这样做:
select
customer,
url,
ts,
count(*) as page_views,
count(distinct(session_id)) as visitors,
collect_list(referrer) as referrers
from df
group by customer, url, ts
有什么方法可以使用 Dask 数据帧来实现吗?我试过了,但我只能单独计算聚合列,如下:
# group on timestamp (rounded) and url
grouped = df.groupby(['ts', 'url'])
# calculate page views (count rows in each group)
page_views = grouped.size()
# collect a list of referrer strings per group
referrers = grouped['referrer'].apply(list, meta=('referrers', 'f8'))
# count unique visitors (session ids)
visitors = grouped['session_id'].count()
但我似乎找不到生成我需要的组合数据框的好方法。
以下确实有效:
gb = df.groupby(['customer', 'url', 'ts'])
gb.apply(lambda d: pd.DataFrame({'views': len(d),
'visitiors': d.session_id.count(),
'referrers': [d.referer.tolist()]})).reset_index()
(假设访问者按照上面的 sql 应该是唯一的)
您可能希望定义输出的 meta
。
这是@j-bennet打开的link to the github issue,提供了一个额外的选项。基于这个问题我们实现聚合如下:
custom_agg = dd.Aggregation(
'custom_agg',
lambda s: s.apply(set),
lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),
)
.
为了结合计数代码如下
dfgp = df.groupby(['ID1','ID2'])
df2 = dfgp.assign(cnt=dfgp.size()).agg(custom_agg).reset_index()
我有一个如下所示的 Dask 数据框:
url referrer session_id ts customer
url1 ref1 xxx 2017-09-15 00:00:00 a.com
url2 ref2 yyy 2017-09-15 00:00:00 a.com
url2 ref3 yyy 2017-09-15 00:00:00 a.com
url1 ref1 xxx 2017-09-15 01:00:00 a.com
url2 ref2 yyy 2017-09-15 01:00:00 a.com
我想根据 url 和时间戳对数据进行分组,聚合列值并生成一个如下所示的数据框:
customer url ts page_views visitors referrers
a.com url1 2017-09-15 00:00:00 1 1 [ref1]
a.com url2 2017-09-15 00:00:00 2 2 [ref2, ref3]
在 Spark SQL 中,我可以这样做:
select
customer,
url,
ts,
count(*) as page_views,
count(distinct(session_id)) as visitors,
collect_list(referrer) as referrers
from df
group by customer, url, ts
有什么方法可以使用 Dask 数据帧来实现吗?我试过了,但我只能单独计算聚合列,如下:
# group on timestamp (rounded) and url
grouped = df.groupby(['ts', 'url'])
# calculate page views (count rows in each group)
page_views = grouped.size()
# collect a list of referrer strings per group
referrers = grouped['referrer'].apply(list, meta=('referrers', 'f8'))
# count unique visitors (session ids)
visitors = grouped['session_id'].count()
但我似乎找不到生成我需要的组合数据框的好方法。
以下确实有效:
gb = df.groupby(['customer', 'url', 'ts'])
gb.apply(lambda d: pd.DataFrame({'views': len(d),
'visitiors': d.session_id.count(),
'referrers': [d.referer.tolist()]})).reset_index()
(假设访问者按照上面的 sql 应该是唯一的)
您可能希望定义输出的 meta
。
这是@j-bennet打开的link to the github issue,提供了一个额外的选项。基于这个问题我们实现聚合如下:
custom_agg = dd.Aggregation(
'custom_agg',
lambda s: s.apply(set),
lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),
)
.
为了结合计数代码如下
dfgp = df.groupby(['ID1','ID2'])
df2 = dfgp.assign(cnt=dfgp.size()).agg(custom_agg).reset_index()