使用自定义逻辑处理来自 Redshift 的数十亿条记录
Process several billion records from Redshift using custom logic
我想对放置在 Redshift 中的数据集应用自定义逻辑。
输入数据示例:
userid, event, fileid, timestamp, ....
100000, start, 120, 2018-09-17 19:11:40
100000, done, 120, 2018-09-17 19:12:40
100000, done, 120, 2018-09-17 19:13:40
100000, start, 500, 2018-09-17 19:13:50
100000, done, 120, 2018-09-17 19:14:40
100000, done, 500, 2018-09-17 19:14:50
100000, done, 120, 2018-09-17 19:15:40
这意味着:
file 120: start-----done-----done-----done-----done
file 150: start-----done
time : 11:40----12:40----13:40-----14:40-----15:40
但它应该看起来像
file 120: start-----done-----done
file 150: start-----done
time : 11:40----12:40----13:40-----14:40-----15:40
文件150一启动就中断了文件120
请记住,这里有很多不同的用户和许多不同的文件。
清理后的数据应该是:
userid, event, fileid, timestamp, ....
100000, start, 120, 2018-09-17 19:11:40
100000, done, 120, 2018-09-17 19:12:40
100000, done, 120, 2018-09-17 19:13:40
100000, start, 500, 2018-09-17 19:13:50
100000, done, 500, 2018-09-17 19:14:50
同一用户不能同时拥有多个并发文件。因此在第二个开始后,不应从当前数据集中删除第一个的事件。
代码很简单,但在 python 上,并且很容易扩展 Google 数据流,例如,但是将 100GB 以上的数据从 AWS 转移到 GC 并不是一个好主意。
问题 #1:
是否可以在 SQL 上(使用 postgres/redshift 特定功能)或更好地使用 Spark? (但不确定如何在那里实施)
问题 #2:
任何关于可能更好地使用 AWS Batch 或其他任何东西的建议,都会导致 apache beam - 这很简单而且非常明显,但 AWS Batch 如何工作以及如何按块划分数据集(如每个用户的组) - 这是一个大问题。
我的建议是以某种方式将数据从 redshift 卸载到 S3 存储桶中,但以单独的文件=用户的方式划分它,然后如果 aws 批处理支持这个 - 只需提供存储桶并且每个文件应该在已经创建的实例上同时处理。不确定这是否有意义。
如果要删除 fileid
与用户最近的 start
不匹配的行,可以使用 lag(ignore nulls)
:
select t.*
from (select t.*,
lag(case when event = 'start' then file_id end ignore nulls) over (partition by userid order by timestamp) as start_fileid
from t
) t
where event = 'start' or start_fileid = fileid;
我想对放置在 Redshift 中的数据集应用自定义逻辑。 输入数据示例:
userid, event, fileid, timestamp, ....
100000, start, 120, 2018-09-17 19:11:40
100000, done, 120, 2018-09-17 19:12:40
100000, done, 120, 2018-09-17 19:13:40
100000, start, 500, 2018-09-17 19:13:50
100000, done, 120, 2018-09-17 19:14:40
100000, done, 500, 2018-09-17 19:14:50
100000, done, 120, 2018-09-17 19:15:40
这意味着:
file 120: start-----done-----done-----done-----done
file 150: start-----done
time : 11:40----12:40----13:40-----14:40-----15:40
但它应该看起来像
file 120: start-----done-----done
file 150: start-----done
time : 11:40----12:40----13:40-----14:40-----15:40
文件150一启动就中断了文件120
请记住,这里有很多不同的用户和许多不同的文件。
清理后的数据应该是:
userid, event, fileid, timestamp, ....
100000, start, 120, 2018-09-17 19:11:40
100000, done, 120, 2018-09-17 19:12:40
100000, done, 120, 2018-09-17 19:13:40
100000, start, 500, 2018-09-17 19:13:50
100000, done, 500, 2018-09-17 19:14:50
同一用户不能同时拥有多个并发文件。因此在第二个开始后,不应从当前数据集中删除第一个的事件。
代码很简单,但在 python 上,并且很容易扩展 Google 数据流,例如,但是将 100GB 以上的数据从 AWS 转移到 GC 并不是一个好主意。
问题 #1: 是否可以在 SQL 上(使用 postgres/redshift 特定功能)或更好地使用 Spark? (但不确定如何在那里实施)
问题 #2: 任何关于可能更好地使用 AWS Batch 或其他任何东西的建议,都会导致 apache beam - 这很简单而且非常明显,但 AWS Batch 如何工作以及如何按块划分数据集(如每个用户的组) - 这是一个大问题。 我的建议是以某种方式将数据从 redshift 卸载到 S3 存储桶中,但以单独的文件=用户的方式划分它,然后如果 aws 批处理支持这个 - 只需提供存储桶并且每个文件应该在已经创建的实例上同时处理。不确定这是否有意义。
如果要删除 fileid
与用户最近的 start
不匹配的行,可以使用 lag(ignore nulls)
:
select t.*
from (select t.*,
lag(case when event = 'start' then file_id end ignore nulls) over (partition by userid order by timestamp) as start_fileid
from t
) t
where event = 'start' or start_fileid = fileid;