Undo/rollback 数据处理管道的影响
Undo/rollback the effects of a data processing pipeline
我有一个工作流程,我将描述如下:
[ Dump(query) ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
|
[ Schema(query) ] ---+
其中:
query
是对 RDBMS 的查询
Dump
将结果 query
转储到 CSV 文件 dump
Schema
运行s query
和 xcoms 其架构 schema
Parquet
读取 csv
并使用 schema
创建 Parquet 文件 parquet
Hive
基于 Parquet 文件 parquet
创建 Hive table
这种复杂的工作流程背后的原因是由于无法解决且超出问题范围的限制(但是,理想情况下它会比这简单得多)。
我的问题是关于在发生故障时回滚管道的影响。
这些是我希望在不同条件下发生的回滚:
dump
应始终删除,无论管道的最终结果如何
parquet
如果出于某种原因,Hive table 创建失败 ,则应删除 parquet
在工作流程中表示这一点,我可能会这样写:
[ Dump(query) ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
| | |
[ Schema(query) ] ---+ | |
v v
[ DeleteParquetOutput ] --> [ DeleteDumpOutput ]
只有在发生错误时才会执行从 Parquet
到 DeleteParquetOutput
的转换,并且转换到 DeleteDumpOutput
时会忽略其依赖项中的任何故障。
这应该可以解决它,但是 我相信更复杂的管道可能会因这种错误处理逻辑而增加复杂性,从而受到很大影响。
在继续讨论更多细节之前,我的问题是:在处理 Airflow 管道中的错误时,这可以被视为一种好的做法吗?有什么不同的(可能更可持续的)方法?
如果您对我想如何解决这个问题更感兴趣,请继续阅读,否则请随时回答 and/or 评论。
我对管道中错误处理的看法
理想情况下,我想做的是:
- 为每个相关阶段定义回滚过程
- 对于每个回滚过程,定义它是只在失败的情况下发生还是在任何情况下都发生
- 当管道完成时,反转依赖关系,并从最后一个成功的任务开始,遍历反转的 DAG 和 运行 相关的回滚过程(如果适用)
- 应该记录回滚过程中的错误,但不考虑以完成整个管道的回滚
- 为了保持上一个点,每个任务应该定义一个单独的效果,其回滚过程可以在不引用其他任务的情况下进行描述
让我们用给定的管道做几个例子。
场景 1:成功
我们反转 DAG 并用它的强制回滚过程(如果有的话)填充每个任务,得到这个
+---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: None ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here
场景二:失败发生在Hive
+---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: UNDO (error) ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here
有什么方法可以在 Airflow 中表示这样的东西吗?我也乐于评估不同的工作流自动化解决方案,如果它们支持这种方法的话。
似乎是一种复杂的错误处理方式。我认为最好将错误视为简单地停止 DAG 的当前 运行,这样您就可以修复任何问题并从它停止的地方 re-start 它。当然,您可以清理由特定任务创建的部分创建的文件,但我不会仅仅因为某些下游问题而取消整个管道。
以我们在我工作的地方所做的为例,诚然它使用不同的技术,但我认为相同的工作流程:
- 从特定时间间隔的源数据库中提取增量并将其压缩到 Airlfow 工作服务器上
- 将此压缩文件移动到 S3 位置
- 将 S3 文件复制到 Snowflake 数据仓库中。
使用我们当前的设置 - 如果有人不小心更改了我们将 S3 文件加载到 Snowflake 的结构 table,唯一失败的任务是自 table 结构不再匹配 CSV 结构。要解决这个问题,我们只需要将 table 的结构恢复到原来的状态,并 re-run 失败的任务。 Airflow 然后 re-copy 文件从 S3 进入 Snowflake 并成功。
按照您提出的设置会发生什么?如果最后一个任务失败,它将 roll-back 整个管道并从 s3 存储桶中删除 CSV 文件;我们将不得不再次从源数据库下载文件。如果我们简单地 re-ran 从 s3 复制到 Snowflake 的任务会更好,从而省去必须 运行 整个 DAG 的麻烦。
BaseOperator
class, which all operators and sensors derive from, supports callbacks: on_success_callback
, on_retry_callback
and on_failure_callback
-- 也许这些会有所帮助。
我有一个工作流程,我将描述如下:
[ Dump(query) ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
|
[ Schema(query) ] ---+
其中:
query
是对 RDBMS 的查询
Dump
将结果query
转储到 CSV 文件dump
Schema
运行squery
和 xcoms 其架构schema
Parquet
读取csv
并使用schema
创建 Parquet 文件parquet
Hive
基于 Parquet 文件parquet
创建 Hive table
这种复杂的工作流程背后的原因是由于无法解决且超出问题范围的限制(但是,理想情况下它会比这简单得多)。
我的问题是关于在发生故障时回滚管道的影响。
这些是我希望在不同条件下发生的回滚:
dump
应始终删除,无论管道的最终结果如何parquet
如果出于某种原因,Hive table 创建失败 ,则应删除 parquet
在工作流程中表示这一点,我可能会这样写:
[ Dump(query) ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
| | |
[ Schema(query) ] ---+ | |
v v
[ DeleteParquetOutput ] --> [ DeleteDumpOutput ]
只有在发生错误时才会执行从 Parquet
到 DeleteParquetOutput
的转换,并且转换到 DeleteDumpOutput
时会忽略其依赖项中的任何故障。
这应该可以解决它,但是 我相信更复杂的管道可能会因这种错误处理逻辑而增加复杂性,从而受到很大影响。
在继续讨论更多细节之前,我的问题是:在处理 Airflow 管道中的错误时,这可以被视为一种好的做法吗?有什么不同的(可能更可持续的)方法?
如果您对我想如何解决这个问题更感兴趣,请继续阅读,否则请随时回答 and/or 评论。
我对管道中错误处理的看法
理想情况下,我想做的是:
- 为每个相关阶段定义回滚过程
- 对于每个回滚过程,定义它是只在失败的情况下发生还是在任何情况下都发生
- 当管道完成时,反转依赖关系,并从最后一个成功的任务开始,遍历反转的 DAG 和 运行 相关的回滚过程(如果适用)
- 应该记录回滚过程中的错误,但不考虑以完成整个管道的回滚
- 为了保持上一个点,每个任务应该定义一个单独的效果,其回滚过程可以在不引用其他任务的情况下进行描述
让我们用给定的管道做几个例子。
场景 1:成功
我们反转 DAG 并用它的强制回滚过程(如果有的话)填充每个任务,得到这个
+---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: None ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here
场景二:失败发生在Hive
+---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: UNDO (error) ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here
有什么方法可以在 Airflow 中表示这样的东西吗?我也乐于评估不同的工作流自动化解决方案,如果它们支持这种方法的话。
似乎是一种复杂的错误处理方式。我认为最好将错误视为简单地停止 DAG 的当前 运行,这样您就可以修复任何问题并从它停止的地方 re-start 它。当然,您可以清理由特定任务创建的部分创建的文件,但我不会仅仅因为某些下游问题而取消整个管道。
以我们在我工作的地方所做的为例,诚然它使用不同的技术,但我认为相同的工作流程:
- 从特定时间间隔的源数据库中提取增量并将其压缩到 Airlfow 工作服务器上
- 将此压缩文件移动到 S3 位置
- 将 S3 文件复制到 Snowflake 数据仓库中。
使用我们当前的设置 - 如果有人不小心更改了我们将 S3 文件加载到 Snowflake 的结构 table,唯一失败的任务是自 table 结构不再匹配 CSV 结构。要解决这个问题,我们只需要将 table 的结构恢复到原来的状态,并 re-run 失败的任务。 Airflow 然后 re-copy 文件从 S3 进入 Snowflake 并成功。
按照您提出的设置会发生什么?如果最后一个任务失败,它将 roll-back 整个管道并从 s3 存储桶中删除 CSV 文件;我们将不得不再次从源数据库下载文件。如果我们简单地 re-ran 从 s3 复制到 Snowflake 的任务会更好,从而省去必须 运行 整个 DAG 的麻烦。
BaseOperator
class, which all operators and sensors derive from, supports callbacks: on_success_callback
, on_retry_callback
and on_failure_callback
-- 也许这些会有所帮助。