Apache Beam 未从订阅中删除无效元素

Apache Beam not removing an invalid element from subscription

刚刚意识到我的管道在处理错误事件时是错误的,它们继续被处理并且从未从订阅中删除。

基本上我有一个简单的管道,其中包含一个触发器,可以将这些事件提取到一个文件中。

在其中一个阶段中,它会处理通过 PubSub 收到的消息的有效负载,并将其转发到下一阶段。但是,在某些情况下这会失败。

        pipeline
        .apply("Read PubSub Events",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
        .apply("Map to MyClass",
            ParDo.of(new PubSubMessageToMyClass())) // Exception thrown in this stage.
        .apply("Apply Timestamps", WithTimestamps.of(new SetTimestampFn()).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
        ...
        );

当错误发生时,我一直在管道中一遍又一遍地看到相同的事件,就像它永远不会结束处理一样。

有什么方法可以明确告诉 Apache Beam 使给定消息无效并防止进一步的失败处理?

Dataflow 处理任意包中的元素,并在该包中的任何元素抛出错误时重试完整包。当 运行 处于批处理模式时,包含失败项目的捆绑包将重试 4 次。 当单个包失败 4 次时,管道将完全失败。当 运行 处于流模式时,包含 失败项目的包将无限期地重试,这可能会导致您的管道永久停滞。

考虑通过添加异常处理程序来防止代码中出现错误。例如,如果您想删除在 ParDo 中完成的某些自定义输入验证失败的元素,请在 ParDo 中使用 try/catch 块来处理异常并删除元素。