Dataflow 流式处理管道中的日志跟踪

Log tracing in a Dataflow streaming pipeline

我有一个包含多个管道的数据流设置,从发布子主题中获取数据。由于这些管道散开并与转换器和 DoFunctions 链合并,因此需要跟踪在整个管道中摄取的每个 pubsub 消息。

执行此操作的正确方法是什么? 一些想法:

  1. 侧输入
  2. ParDo 函数的每个输入都有一个带有跟踪 ID 等的上下文对象(有点不直观)

谢谢!

我相信你的第二种方法最有意义。

在您的过程元素函数中,您可以捕获任何异常并记录任何失败:

import org.sfl4j.Logger;
import org.slf4j.LoggerFactory;
import ...

public class MyDoFn<ObjectWithPubsubIdA, ObjectWithPubsubIdB> {
  private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);

  @ProcessElement
  public void processElement(ProcessContext c) {
    ObjectWithPubsubIdA a = c.element();
    try {
      ObjectWithPubsubIdB b = // transform ObjectWithPubsubIdA ...
      c.output(b);
    } catch (Exception e) {
      LOG.error("MyDoFn failed for message with id {} with exception {}", a.getId(), e);
    }
  }
}

您可以使用抽象基础 class 或某些其他特定于语言的构造来 re-use 代码,这样您就可以在所有转换中共享一个实现。