"UPSERT" 使用 Kinesis Firehose 的 Redshift table

"UPSERT" a Redshift table using Kinesis Firehose

我正在设置 Kinesis Data Stream -> Firehose -> Redshift 数据管道。

Docs 说 Firehose 可能会引入重复项。

为了解决这个问题,我尝试使用 here 中的“upsert”方法。

是否可以通过 Firehose COPY 命令设置事务调用?

最佳做法是什么?

提前致谢!

有用代码:

create table target 
(
    key BIGINT,
    value VARCHAR
);
{"key": 1, "value": "stub 1"}

begin transaction;
    
    -- create a stage table like a target table
    create temp table stage (like target);

    -- populate the stage table
    COPY stage FROM 's3://<bucket-name>/<manifests-folder>/' 
    CREDENTIALS 'aws_iam_role=arn:aws:iam::<iam>:role/<role>' 
    MANIFEST format as json 'auto';

    -- delete duplicates from the target table
    delete from target using stage 
    where target.key = stage.key; 

    -- populate the target table
    insert into target
    (select * from stage);

    -- drop the stage table
    drop table stage;

end transaction;

是的,您可以在 Firehose 上设置触发器,但我建议您不要这样做。如果您使用的是 Firehose,您会很快收到数据,并且 Firehose 会根据需要复制到 Redshift。出现的问题是您真正希望 tables 在 Redshift 中改变的速度有多快。 Redshift 很大,但您希望将增量数据添加到事实 table 的速度有一些实际限制。该速率将取决于摄取后需要进行哪些转换、您能够负担得起清理/分析的频率、您的查询负载/复杂性有多大等。通常可以每 5 分钟刷新一次。在合适的条件下,更快也是可行的。

因此,在暂存 table 中收集所有新数据并将其每 1-5 分钟添加到目标 table 的过程是以独立速率执行此操作的方法。 CloudWatch 可以在您喜欢的任何时间间隔触发 Lambda,Lambda 可以向 Redshift 发出 SQL。 SQL 看起来像:

  1. 重命名暂存 table 并创建一个新的暂存 table 交易
  2. 在一个新的事务中并独占地锁定重命名的分段 table 以便在进程继续之前完成所有提交
  3. 将重命名的分段 table 引入目标 table 就像您在上面的事务中所做的那样
  4. 删除重命名的分段 table

关键是使用 Redshift 一致性来确保 firehose 和 lambda 这两个进程没有数据丢失或重复,运行 以独立的速率。每个 COPY 在暂存 table 重命名事务的 COMMIT 之前或之后开始。如果之前,传入的数据进入重命名的暂存 table,我们需要确保在 COPY 完成并提交之前不会开始对目标 table 的摄取。如果之后,数据将进入新的暂存区 table。我会在重命名的暂存 table 的名称中添加一个随机数,这样只有 Lambda 会使用这个名称——如果摄取速率对于大量传入数据来说太高,这可以防止名称冲突,并且这些流程重叠。