"UPSERT" 使用 Kinesis Firehose 的 Redshift table
"UPSERT" a Redshift table using Kinesis Firehose
我正在设置 Kinesis Data Stream -> Firehose -> Redshift
数据管道。
Docs 说 Firehose 可能会引入重复项。
为了解决这个问题,我尝试使用 here 中的“upsert”方法。
是否可以通过 Firehose COPY 命令设置事务调用?
最佳做法是什么?
提前致谢!
有用代码:
- 目标table创作
create table target
(
key BIGINT,
value VARCHAR
);
- 数据格式
{"key": 1, "value": "stub 1"}
- upsert 事务
begin transaction;
-- create a stage table like a target table
create temp table stage (like target);
-- populate the stage table
COPY stage FROM 's3://<bucket-name>/<manifests-folder>/'
CREDENTIALS 'aws_iam_role=arn:aws:iam::<iam>:role/<role>'
MANIFEST format as json 'auto';
-- delete duplicates from the target table
delete from target using stage
where target.key = stage.key;
-- populate the target table
insert into target
(select * from stage);
-- drop the stage table
drop table stage;
end transaction;
是的,您可以在 Firehose 上设置触发器,但我建议您不要这样做。如果您使用的是 Firehose,您会很快收到数据,并且 Firehose 会根据需要复制到 Redshift。出现的问题是您真正希望 tables 在 Redshift 中改变的速度有多快。 Redshift 很大,但您希望将增量数据添加到事实 table 的速度有一些实际限制。该速率将取决于摄取后需要进行哪些转换、您能够负担得起清理/分析的频率、您的查询负载/复杂性有多大等。通常可以每 5 分钟刷新一次。在合适的条件下,更快也是可行的。
因此,在暂存 table 中收集所有新数据并将其每 1-5 分钟添加到目标 table 的过程是以独立速率执行此操作的方法。 CloudWatch 可以在您喜欢的任何时间间隔触发 Lambda,Lambda 可以向 Redshift 发出 SQL。 SQL 看起来像:
- 重命名暂存 table 并创建一个新的暂存 table
交易
- 在一个新的事务中并独占地锁定重命名的分段 table 以便在进程继续之前完成所有提交
- 将重命名的分段 table 引入目标 table 就像您在上面的事务中所做的那样
- 删除重命名的分段 table
关键是使用 Redshift 一致性来确保 firehose 和 lambda 这两个进程没有数据丢失或重复,运行 以独立的速率。每个 COPY 在暂存 table 重命名事务的 COMMIT 之前或之后开始。如果之前,传入的数据进入重命名的暂存 table,我们需要确保在 COPY 完成并提交之前不会开始对目标 table 的摄取。如果之后,数据将进入新的暂存区 table。我会在重命名的暂存 table 的名称中添加一个随机数,这样只有 Lambda 会使用这个名称——如果摄取速率对于大量传入数据来说太高,这可以防止名称冲突,并且这些流程重叠。
我正在设置 Kinesis Data Stream -> Firehose -> Redshift
数据管道。
Docs 说 Firehose 可能会引入重复项。
为了解决这个问题,我尝试使用 here 中的“upsert”方法。
是否可以通过 Firehose COPY 命令设置事务调用?
最佳做法是什么?
提前致谢!
有用代码:
- 目标table创作
create table target
(
key BIGINT,
value VARCHAR
);
- 数据格式
{"key": 1, "value": "stub 1"}
- upsert 事务
begin transaction;
-- create a stage table like a target table
create temp table stage (like target);
-- populate the stage table
COPY stage FROM 's3://<bucket-name>/<manifests-folder>/'
CREDENTIALS 'aws_iam_role=arn:aws:iam::<iam>:role/<role>'
MANIFEST format as json 'auto';
-- delete duplicates from the target table
delete from target using stage
where target.key = stage.key;
-- populate the target table
insert into target
(select * from stage);
-- drop the stage table
drop table stage;
end transaction;
是的,您可以在 Firehose 上设置触发器,但我建议您不要这样做。如果您使用的是 Firehose,您会很快收到数据,并且 Firehose 会根据需要复制到 Redshift。出现的问题是您真正希望 tables 在 Redshift 中改变的速度有多快。 Redshift 很大,但您希望将增量数据添加到事实 table 的速度有一些实际限制。该速率将取决于摄取后需要进行哪些转换、您能够负担得起清理/分析的频率、您的查询负载/复杂性有多大等。通常可以每 5 分钟刷新一次。在合适的条件下,更快也是可行的。
因此,在暂存 table 中收集所有新数据并将其每 1-5 分钟添加到目标 table 的过程是以独立速率执行此操作的方法。 CloudWatch 可以在您喜欢的任何时间间隔触发 Lambda,Lambda 可以向 Redshift 发出 SQL。 SQL 看起来像:
- 重命名暂存 table 并创建一个新的暂存 table 交易
- 在一个新的事务中并独占地锁定重命名的分段 table 以便在进程继续之前完成所有提交
- 将重命名的分段 table 引入目标 table 就像您在上面的事务中所做的那样
- 删除重命名的分段 table
关键是使用 Redshift 一致性来确保 firehose 和 lambda 这两个进程没有数据丢失或重复,运行 以独立的速率。每个 COPY 在暂存 table 重命名事务的 COMMIT 之前或之后开始。如果之前,传入的数据进入重命名的暂存 table,我们需要确保在 COPY 完成并提交之前不会开始对目标 table 的摄取。如果之后,数据将进入新的暂存区 table。我会在重命名的暂存 table 的名称中添加一个随机数,这样只有 Lambda 会使用这个名称——如果摄取速率对于大量传入数据来说太高,这可以防止名称冲突,并且这些流程重叠。