如何 Crash/Stop DataFlow Pub/Sub BigQuery 插入错误时的摄取

How to Crash/Stop DataFlow Pub/Sub Ingestion on BigQuery Insert Error

我正在寻找一种方法,使 Google DataFlow 作业在发生(特定)异常时停止从 Pub/Sub 摄取。

来自 Pub/Sub 的事件通过 PubsubIO.Read.Bound<TableRow> 使用 TableRowJsonCoder JSON 读取并直接流式传输到 BigQuery BigQueryIO.Write.Bound。 (中间有一个 ParDo 可以更改一个字段的内容和一些按天发生的自定义分区,但这与此目的无关。)

当 events/rows 从 PubSub 中提取的字段不是目标 BigQuery table 中的列时,DataFlow 作业在 运行 时记录 IOExceptions 声称它无法插入行,但似乎承认这些消息并继续 运行ning。

我想做的是停止从 Pub/Sub and/or 中摄取消息,使数据流作业崩溃,这样警报就可以基于最旧的未确认消息的年龄。至少我想确保那些 Pub/Sub 未能插入到 BigQuery 的消息不会被确认,以便我可以解决问题,重新启动数据流作业并再次使用这些消息。

我知道这里描述了一种处理错误输入的建议解决方案:https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow

我也知道 Apache Beam 上的这个 PR,它允许插入没有违规字段的行: https://github.com/apache/beam/pull/1778

但是在我的例子中,我真的不想防止错误输入,而是防止程序员错误,即新字段被添加到 JSON 消息中,这些消息被推送到 Pub/Sub,但相应的 DataFlow 作业未更新。所以我真的没有错误的数据,我只是想在程序员犯错误时崩溃,因为在更改消息格式之前没有部署新的数据流作业。

我假设可以(类似于博客 post 解决方案)创建一个自定义 ParDo 来验证每一行并抛出未捕获并导致崩溃的异常.

但理想情况下,我只希望有一些配置不处理插入错误并记录它,而只是使作业崩溃或至少停止摄取。

您可以有一个 ParDo 和一个位于 BQ 写入之前的 DoFn。 DoFn 将负责每隔 X 分钟获取输出 table 模式,并验证要写入的每条记录是否与预期的输出模式匹配(如果不匹配则抛出异常)。

Old Pipeline:
PubSub -> Some Transforms -> BQ Sink

New Pipeline:
PubSub -> Some Transforms -> ParDo(BQ Sink Validator) -> BQ Sink

这样做的好处是,一旦有人修复了输出 table 架构,管道就会恢复。您需要抛出一个很好的错误消息,说明传入的 PubSub 消息有什么问题。

或者,您可以让 BQ Sink Validator 将消息输出到 PubSub DLQ(监控其大小)。在操作上,您必须更新 table,然后将 DLQ 作为输入重新摄取。这样做的好处是只有坏消息才会阻止管道执行。