使用自定义逻辑处理来自 Redshift 的数十亿条记录

Process several billion records from Redshift using custom logic

我想对放置在 Redshift 中的数据集应用自定义逻辑。 输入数据示例:

userid, event,  fileid, timestamp, ....
100000, start,  120,    2018-09-17 19:11:40
100000, done,   120,    2018-09-17 19:12:40
100000, done,   120,    2018-09-17 19:13:40
100000, start,  500,    2018-09-17 19:13:50
100000, done,   120,    2018-09-17 19:14:40
100000, done,   500,    2018-09-17 19:14:50
100000, done,   120,    2018-09-17 19:15:40

这意味着:

file 120:  start-----done-----done-----done-----done 
file 150:                      start-----done   
time    :  11:40----12:40----13:40-----14:40-----15:40

但它应该看起来像

file 120:  start-----done-----done 
file 150:                      start-----done   
time    :  11:40----12:40----13:40-----14:40-----15:40

文件150一启动就中断了文件120

请记住,这里有很多不同的用户和许多不同的文件。

清理后的数据应该是:

userid, event,  fileid, timestamp, ....
100000, start,  120,    2018-09-17 19:11:40
100000, done,   120,    2018-09-17 19:12:40
100000, done,   120,    2018-09-17 19:13:40
100000, start,  500,    2018-09-17 19:13:50
100000, done,   500,    2018-09-17 19:14:50

同一用户不能同时拥有多个并发文件。因此在第二个开始后,不应从当前数据集中删除第一个的事件。

代码很简单,但在 python 上,并且很容易扩展 Google 数据流,例如,但是将 100GB 以上的数据从 AWS 转移到 GC 并不是一个好主意。

问题 #1: 是否可以在 SQL 上(使用 postgres/redshift 特定功能)或更好地使用 Spark? (但不确定如何在那里实施)

问题 #2: 任何关于可能更好地使用 AWS Batch 或其他任何东西的建议,都会导致 apache beam - 这很简单而且非常明显,但 AWS Batch 如何工作以及如何按块划分数据集(如每个用户的组) - 这是一个大问题。 我的建议是以某种方式将数据从 redshift 卸载到 S3 存储桶中,但以单独的文件=用户的方式划分它,然后如果 aws 批处理支持这个 - 只需提供存储桶并且每个文件应该在已经创建的实例上同时处理。不确定这是否有意义。

如果要删除 fileid 与用户最近的 start 不匹配的行,可以使用 lag(ignore nulls):

select t.*
from (select t.*,
             lag(case when event = 'start' then file_id end ignore nulls) over (partition by userid order by timestamp) as start_fileid
      from t
     ) t
where event = 'start' or start_fileid = fileid;