首先通过 ID 然后在几分钟内聚合 Dask 数据时杀死工作人员

Killed worker when aggregating Dask data first over ID then on minutes

我的目标是首先在 station_id 上聚合 NYC Citibike 数据,然后在 Dask 中的 starttime 分钟内聚合数据。

Dask DataFrame 的头部如下所示,

df_start.head()

显示,

    starttime   start_station_name
start_station_id        
72  2017-08-15 16:02:02 W 52 St & 11 Ave
72  2017-12-01 09:52:20 W 52 St & 11 Ave
72  2017-09-06 12:39:25 W 52 St & 11 Ave
72  2016-05-26 08:41:24 W 52 St & 11 Ave
72  2016-02-28 14:57:16 W 52 St & 11 Ave

Dask 中的聚合,

count_per_station = df_start.groupby(
  ['start_station_id', 
   df_start.starttime.dt.year.rename('year'), 
   df_start.starttime.dt.month.rename('month'), 
   df_start.starttime.dt.hour.rename('hour'),
   df_start.starttime.dt.minute.rename('minute')]).count()

client.persist(count_per_station)
progress(count_per_station, notebook=False)

给出以下输出,

[########################################] | 100% Completed |  0.0s

露头害死工人,

count_per_station.head()

给我一些保姆错误,完整垃圾收集器的警告,最终失败

KilledWorker: ("('dataframe-groupby-count-agg-dc00c500225feb4dc9e32c710613bd1c', 0)", 'tcp://127.0.0.1:49324')

还尝试在 Dask 中执行简单的 groupby 并在 Pandas 中执行困难的 groupby 但在那里出现错误,

def min_counter(b):
    b.groupby([
      b.starttime.dt.year.rename('year'),           
      b.starttime.dt.month.rename('month'), 
      b.starttime.dt.hour.rename('hour'),    
      b.starttime.dt.minute.rename('min')]).count()

df_start.groupby(['start_station_id']).apply(min_counter)

导致,

  /usr/local/anaconda3/envs/bike-demand-forecasting/lib/python3.6/site-packages/ipykernel/__main__.py:4: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result

垃圾收集器再次发出警告,最终导致工人死亡。

理想情况下,我会在 pandas 中使用 resample 方法,但不确定如何在不使工作人员崩溃的情况下有效地使用该方法或上述方法之一。

答案最终是,

def min_counter(b):
    return b.groupby(pd.Grouper(key='starttime', freq='5min')).count()

counter_per_station = df_start.groupby('start_station_id').apply(min_counter)

希望这对面临同样问题的人有所帮助。