使用 Redshift Copy 命令进行合并

Use Redshift Copy command to do a merge

我有一个迭代输入并将数据输出到 AWS Firehose 的过程,我已将其配置为上传到我创建的 redshift table。一个问题是有时行可能会重复,因为流程需要重新评估数据。 类似于:

Event_date, event_id, event_cost
2015-06-25, 123, 3
2015-06-25, 123, 4

http://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html

看那里,我想用新值替换旧行,比如:

insert into event_table_staging  
select event_date,event_id, event_cost from <s3 location>;

delete from event_table  
using event_table_staging  
where event_table.event_id = event_table_staging.event_id;

insert into target 
select * from event_table_staging;

delete from event_table_staging  
select * from event_table_staging;

是否可以这样做:

Redshift columns: event_date,event_id,cost
copy event_table from <s3> 
(update event_table 
select c_source.event_date,c_source.event_id,c_source.cost from <s3 source> as c_source join event_table on c_source.event_id = event_table.event_id) 
CSV


copy event_table from <s3> 
(insert into event_table 
select c_source.event_date,c_source.event_id,c_source.cost from event_table left outer join<s3 source> as c_source join on c_source.event_id = event_table.event_id where c_source.event_id is NULL) 
CSV

您不能直接从 COPY 进行合并。

然而,您的初始方法可以包含在使用临时 table 的事务中以暂存加载数据以获得最佳性能。

BEGIN
;
CREATE TEMP TABLE event_table_staging (
     event_date  TIMESTAMP  NULL
    ,event_id    BIGINT     NULL
    ,event_cost  INTEGER    NULL )
DISTSTYLE KEY
DISTKEY (event_id)
SORTKEY (event_id)
;
COPY event_table_staging  
FROM <s3 location>
COMPUDATE ON
;
UPDATE event_table  
SET    event_date = new.event_date
      ,event_cost = new.event_cost
FROM        event_table         AS trg
INNER JOIN  event_table_staging AS new
        ON  trg.event_id = new.event_id
WHERE COALESCE(trg.event_date,0) <> COALESCE(new.event_date,0)
  AND COALESCE(trg.event_cost,0) <> COALESCE(new.event_cost,0)
;
INSERT INTO event_table 
SELECT  event_date
       ,event_id  
       ,event_cost
FROM        event_table_staging AS new
LEFT JOIN   event_table         AS trg
       ON   trg.event_id = new.event_id
WHERE trg.event_id IS NULL
;
COMMIT
;

只要您使用事务并且更新总量相对低(个位数百分比),这种方法实际上表现得非常好。唯一需要注意的是,您的目标需要定期 VACUUMed - 一个月一次对我们来说就足够了。

我们每小时在数百万行范围内的几个 table 执行此操作,即,数百万行的数百行合并为数百万行的数百行。合并后的 table 的用户查询仍然表现良好。

Redshift 针对以经济高效的方式处理大量数据进行了优化,您需要改变一些关于您从其他数据库获得的数据和数据库的想法。

主要概念是您不应在 Redshift 中更新数据。您应该将 Redshift 中的数据视为 "Log"。您可以将函数用作 INSERT 或 UPDATE,但它们会极大地限制您可以处理的数据量。

您可以通过多种方式处理重复项:

  • 您可以首先防止写入重复项,方法是管理所有 ID 的一些内存中查找 tables(例如在 ElastiCache 上的 Redis 中)您正在处理并忽略一条记录,如果您已经处理过它

  • 您可以在 Redshift 中保留重复项并使用 WINDOW functions that will take only one of the records (LAST_VALUE 处理这些记录)。

  • 您可以在 Redshift 中拥有原始事件并在对数据库的查询中进行聚合,而不是将其作为预处理进行。此模式还可以灵活地更改聚合数据的方式。 Redshift 使用这些聚合可以非常快,几乎不需要预聚合。

如果您仍想在 Redshift 中拥有 "clean" 和聚合数据,您可以 UNLOAD 使用一些 SQL 查询和正确的聚合或 WINDOW 查询该数据函数,删除旧的 table 并将数据复制回 Redshift。