首先通过 ID 然后在几分钟内聚合 Dask 数据时杀死工作人员
Killed worker when aggregating Dask data first over ID then on minutes
我的目标是首先在 station_id
上聚合 NYC Citibike 数据,然后在 Dask 中的 starttime
分钟内聚合数据。
Dask DataFrame 的头部如下所示,
df_start.head()
显示,
starttime start_station_name
start_station_id
72 2017-08-15 16:02:02 W 52 St & 11 Ave
72 2017-12-01 09:52:20 W 52 St & 11 Ave
72 2017-09-06 12:39:25 W 52 St & 11 Ave
72 2016-05-26 08:41:24 W 52 St & 11 Ave
72 2016-02-28 14:57:16 W 52 St & 11 Ave
Dask 中的聚合,
count_per_station = df_start.groupby(
['start_station_id',
df_start.starttime.dt.year.rename('year'),
df_start.starttime.dt.month.rename('month'),
df_start.starttime.dt.hour.rename('hour'),
df_start.starttime.dt.minute.rename('minute')]).count()
client.persist(count_per_station)
progress(count_per_station, notebook=False)
给出以下输出,
[########################################] | 100% Completed | 0.0s
露头害死工人,
count_per_station.head()
给我一些保姆错误,完整垃圾收集器的警告,最终失败
KilledWorker: ("('dataframe-groupby-count-agg-dc00c500225feb4dc9e32c710613bd1c', 0)", 'tcp://127.0.0.1:49324')
还尝试在 Dask 中执行简单的 groupby 并在 Pandas 中执行困难的 groupby 但在那里出现错误,
def min_counter(b):
b.groupby([
b.starttime.dt.year.rename('year'),
b.starttime.dt.month.rename('month'),
b.starttime.dt.hour.rename('hour'),
b.starttime.dt.minute.rename('min')]).count()
df_start.groupby(['start_station_id']).apply(min_counter)
导致,
/usr/local/anaconda3/envs/bike-demand-forecasting/lib/python3.6/site-packages/ipykernel/__main__.py:4: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
Before: .apply(func)
After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
or: .apply(func, meta=('x', 'f8')) for series result
垃圾收集器再次发出警告,最终导致工人死亡。
理想情况下,我会在 pandas 中使用 resample 方法,但不确定如何在不使工作人员崩溃的情况下有效地使用该方法或上述方法之一。
答案最终是,
def min_counter(b):
return b.groupby(pd.Grouper(key='starttime', freq='5min')).count()
counter_per_station = df_start.groupby('start_station_id').apply(min_counter)
希望这对面临同样问题的人有所帮助。
我的目标是首先在 station_id
上聚合 NYC Citibike 数据,然后在 Dask 中的 starttime
分钟内聚合数据。
Dask DataFrame 的头部如下所示,
df_start.head()
显示,
starttime start_station_name
start_station_id
72 2017-08-15 16:02:02 W 52 St & 11 Ave
72 2017-12-01 09:52:20 W 52 St & 11 Ave
72 2017-09-06 12:39:25 W 52 St & 11 Ave
72 2016-05-26 08:41:24 W 52 St & 11 Ave
72 2016-02-28 14:57:16 W 52 St & 11 Ave
Dask 中的聚合,
count_per_station = df_start.groupby(
['start_station_id',
df_start.starttime.dt.year.rename('year'),
df_start.starttime.dt.month.rename('month'),
df_start.starttime.dt.hour.rename('hour'),
df_start.starttime.dt.minute.rename('minute')]).count()
client.persist(count_per_station)
progress(count_per_station, notebook=False)
给出以下输出,
[########################################] | 100% Completed | 0.0s
露头害死工人,
count_per_station.head()
给我一些保姆错误,完整垃圾收集器的警告,最终失败
KilledWorker: ("('dataframe-groupby-count-agg-dc00c500225feb4dc9e32c710613bd1c', 0)", 'tcp://127.0.0.1:49324')
还尝试在 Dask 中执行简单的 groupby 并在 Pandas 中执行困难的 groupby 但在那里出现错误,
def min_counter(b):
b.groupby([
b.starttime.dt.year.rename('year'),
b.starttime.dt.month.rename('month'),
b.starttime.dt.hour.rename('hour'),
b.starttime.dt.minute.rename('min')]).count()
df_start.groupby(['start_station_id']).apply(min_counter)
导致,
/usr/local/anaconda3/envs/bike-demand-forecasting/lib/python3.6/site-packages/ipykernel/__main__.py:4: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
Before: .apply(func)
After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
or: .apply(func, meta=('x', 'f8')) for series result
垃圾收集器再次发出警告,最终导致工人死亡。
理想情况下,我会在 pandas 中使用 resample 方法,但不确定如何在不使工作人员崩溃的情况下有效地使用该方法或上述方法之一。
答案最终是,
def min_counter(b):
return b.groupby(pd.Grouper(key='starttime', freq='5min')).count()
counter_per_station = df_start.groupby('start_station_id').apply(min_counter)
希望这对面临同样问题的人有所帮助。