使用 Spark 和 Redshift 时如何优化 ETL 数据管道以实现容错?
How to optimize ETL data pipeline for fault tolerance when using Spark and Redshift?
我正在使用 PySpark 编写一个大批量作业,ETL 为 200 tables 并加载到 Amazon Redshift 中。
这 200 个 table 是从一个输入数据源创建的。因此,只有当数据加载到 ALL 200 tables 成功时,批处理作业才成功。批处理作业每天 运行 秒,同时将每个日期的数据附加到 table 秒。
对于容错、可靠性和幂等性,我当前的工作流程如下:
- 使用暂存 tables。使用
CREATE TEMP TABLE LIKE <target_table>
创建临时 Redshift tables
- 将数据转换并加载到暂存中 table。
- 重复 1-2 其他 200 tables。
- 开始
BEGIN
事务。
- 将暂存 table 数据复制到目标 table
使用
INSERT INTO <taget_table> SELECT * FROM <staging_table>
END
交易
DROP
所有分期 tables.
这样我可以保证如果第 3 步失败(这种可能性更大),我不必担心从原始 table 中删除部分数据。相反,我将简单地重新 运行 整个批处理作业,因为临时 table 在 JDBC 断开连接后被丢弃。
虽然它解决了大部分问题,但它并不优雅、粗糙并且会消耗额外的时间。我想如果 Spark and/or Redshift 提供标准工具来解决 ETL 世界中这个非常普遍的问题。
谢谢
COPY 命令可以在事务块中。你只需要:
- 开始
- 将数据复制到所有 table
- 提交(如果成功)
Redshift 将为所有其他查看者保留 tables 的先前版本,他们对 tables 的看法在 COMMIT 之前不会改变。
您布置的进程的好处是,在事务 运行 期间,其他进程无法获得 table 上的独占锁(ALTER TABLE ETC)。您的插入将 运行 比 COPY 快,因此交易打开的时间会更短。这只是一个问题,如果其他进程正在修改 table 同时 ETL 是 运行ning,这通常不是一个好主意。
我正在使用 PySpark 编写一个大批量作业,ETL 为 200 tables 并加载到 Amazon Redshift 中。 这 200 个 table 是从一个输入数据源创建的。因此,只有当数据加载到 ALL 200 tables 成功时,批处理作业才成功。批处理作业每天 运行 秒,同时将每个日期的数据附加到 table 秒。
对于容错、可靠性和幂等性,我当前的工作流程如下:
- 使用暂存 tables。使用
CREATE TEMP TABLE LIKE <target_table>
创建临时 Redshift tables
- 将数据转换并加载到暂存中 table。
- 重复 1-2 其他 200 tables。
- 开始
BEGIN
事务。 - 将暂存 table 数据复制到目标 table
使用
INSERT INTO <taget_table> SELECT * FROM <staging_table>
END
交易DROP
所有分期 tables.
这样我可以保证如果第 3 步失败(这种可能性更大),我不必担心从原始 table 中删除部分数据。相反,我将简单地重新 运行 整个批处理作业,因为临时 table 在 JDBC 断开连接后被丢弃。
虽然它解决了大部分问题,但它并不优雅、粗糙并且会消耗额外的时间。我想如果 Spark and/or Redshift 提供标准工具来解决 ETL 世界中这个非常普遍的问题。
谢谢
COPY 命令可以在事务块中。你只需要:
- 开始
- 将数据复制到所有 table
- 提交(如果成功)
Redshift 将为所有其他查看者保留 tables 的先前版本,他们对 tables 的看法在 COMMIT 之前不会改变。
您布置的进程的好处是,在事务 运行 期间,其他进程无法获得 table 上的独占锁(ALTER TABLE ETC)。您的插入将 运行 比 COPY 快,因此交易打开的时间会更短。这只是一个问题,如果其他进程正在修改 table 同时 ETL 是 运行ning,这通常不是一个好主意。