如何 Crash/Stop DataFlow Pub/Sub BigQuery 插入错误时的摄取
How to Crash/Stop DataFlow Pub/Sub Ingestion on BigQuery Insert Error
我正在寻找一种方法,使 Google DataFlow 作业在发生(特定)异常时停止从 Pub/Sub 摄取。
来自 Pub/Sub 的事件通过 PubsubIO.Read.Bound<TableRow>
使用 TableRowJsonCoder
JSON 读取并直接流式传输到 BigQuery
BigQueryIO.Write.Bound
。
(中间有一个 ParDo
可以更改一个字段的内容和一些按天发生的自定义分区,但这与此目的无关。)
当 events/rows 从 PubSub 中提取的字段不是目标 BigQuery table 中的列时,DataFlow 作业在 运行 时记录 IOExceptions 声称它无法插入行,但似乎承认这些消息并继续 运行ning。
我想做的是停止从 Pub/Sub and/or 中摄取消息,使数据流作业崩溃,这样警报就可以基于最旧的未确认消息的年龄。至少我想确保那些 Pub/Sub 未能插入到 BigQuery 的消息不会被确认,以便我可以解决问题,重新启动数据流作业并再次使用这些消息。
我知道这里描述了一种处理错误输入的建议解决方案:https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow
我也知道 Apache Beam 上的这个 PR,它允许插入没有违规字段的行:
https://github.com/apache/beam/pull/1778
但是在我的例子中,我真的不想防止错误输入,而是防止程序员错误,即新字段被添加到 JSON 消息中,这些消息被推送到 Pub/Sub,但相应的 DataFlow 作业未更新。所以我真的没有错误的数据,我只是想在程序员犯错误时崩溃,因为在更改消息格式之前没有部署新的数据流作业。
我假设可以(类似于博客 post 解决方案)创建一个自定义 ParDo
来验证每一行并抛出未捕获并导致崩溃的异常.
但理想情况下,我只希望有一些配置不处理插入错误并记录它,而只是使作业崩溃或至少停止摄取。
您可以有一个 ParDo 和一个位于 BQ 写入之前的 DoFn。 DoFn 将负责每隔 X 分钟获取输出 table 模式,并验证要写入的每条记录是否与预期的输出模式匹配(如果不匹配则抛出异常)。
Old Pipeline:
PubSub -> Some Transforms -> BQ Sink
New Pipeline:
PubSub -> Some Transforms -> ParDo(BQ Sink Validator) -> BQ Sink
这样做的好处是,一旦有人修复了输出 table 架构,管道就会恢复。您需要抛出一个很好的错误消息,说明传入的 PubSub 消息有什么问题。
或者,您可以让 BQ Sink Validator
将消息输出到 PubSub DLQ(监控其大小)。在操作上,您必须更新 table,然后将 DLQ 作为输入重新摄取。这样做的好处是只有坏消息才会阻止管道执行。
我正在寻找一种方法,使 Google DataFlow 作业在发生(特定)异常时停止从 Pub/Sub 摄取。
来自 Pub/Sub 的事件通过 PubsubIO.Read.Bound<TableRow>
使用 TableRowJsonCoder
JSON 读取并直接流式传输到 BigQuery
BigQueryIO.Write.Bound
。
(中间有一个 ParDo
可以更改一个字段的内容和一些按天发生的自定义分区,但这与此目的无关。)
当 events/rows 从 PubSub 中提取的字段不是目标 BigQuery table 中的列时,DataFlow 作业在 运行 时记录 IOExceptions 声称它无法插入行,但似乎承认这些消息并继续 运行ning。
我想做的是停止从 Pub/Sub and/or 中摄取消息,使数据流作业崩溃,这样警报就可以基于最旧的未确认消息的年龄。至少我想确保那些 Pub/Sub 未能插入到 BigQuery 的消息不会被确认,以便我可以解决问题,重新启动数据流作业并再次使用这些消息。
我知道这里描述了一种处理错误输入的建议解决方案:https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow
我也知道 Apache Beam 上的这个 PR,它允许插入没有违规字段的行: https://github.com/apache/beam/pull/1778
但是在我的例子中,我真的不想防止错误输入,而是防止程序员错误,即新字段被添加到 JSON 消息中,这些消息被推送到 Pub/Sub,但相应的 DataFlow 作业未更新。所以我真的没有错误的数据,我只是想在程序员犯错误时崩溃,因为在更改消息格式之前没有部署新的数据流作业。
我假设可以(类似于博客 post 解决方案)创建一个自定义 ParDo
来验证每一行并抛出未捕获并导致崩溃的异常.
但理想情况下,我只希望有一些配置不处理插入错误并记录它,而只是使作业崩溃或至少停止摄取。
您可以有一个 ParDo 和一个位于 BQ 写入之前的 DoFn。 DoFn 将负责每隔 X 分钟获取输出 table 模式,并验证要写入的每条记录是否与预期的输出模式匹配(如果不匹配则抛出异常)。
Old Pipeline:
PubSub -> Some Transforms -> BQ Sink
New Pipeline:
PubSub -> Some Transforms -> ParDo(BQ Sink Validator) -> BQ Sink
这样做的好处是,一旦有人修复了输出 table 架构,管道就会恢复。您需要抛出一个很好的错误消息,说明传入的 PubSub 消息有什么问题。
或者,您可以让 BQ Sink Validator
将消息输出到 PubSub DLQ(监控其大小)。在操作上,您必须更新 table,然后将 DLQ 作为输入重新摄取。这样做的好处是只有坏消息才会阻止管道执行。