Dataflow 流式处理管道中的日志跟踪
Log tracing in a Dataflow streaming pipeline
我有一个包含多个管道的数据流设置,从发布子主题中获取数据。由于这些管道散开并与转换器和 DoFunctions 链合并,因此需要跟踪在整个管道中摄取的每个 pubsub 消息。
执行此操作的正确方法是什么?
一些想法:
- 侧输入
- ParDo 函数的每个输入都有一个带有跟踪 ID 等的上下文对象(有点不直观)
谢谢!
我相信你的第二种方法最有意义。
在您的过程元素函数中,您可以捕获任何异常并记录任何失败:
import org.sfl4j.Logger;
import org.slf4j.LoggerFactory;
import ...
public class MyDoFn<ObjectWithPubsubIdA, ObjectWithPubsubIdB> {
private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);
@ProcessElement
public void processElement(ProcessContext c) {
ObjectWithPubsubIdA a = c.element();
try {
ObjectWithPubsubIdB b = // transform ObjectWithPubsubIdA ...
c.output(b);
} catch (Exception e) {
LOG.error("MyDoFn failed for message with id {} with exception {}", a.getId(), e);
}
}
}
您可以使用抽象基础 class 或某些其他特定于语言的构造来 re-use 代码,这样您就可以在所有转换中共享一个实现。
我有一个包含多个管道的数据流设置,从发布子主题中获取数据。由于这些管道散开并与转换器和 DoFunctions 链合并,因此需要跟踪在整个管道中摄取的每个 pubsub 消息。
执行此操作的正确方法是什么? 一些想法:
- 侧输入
- ParDo 函数的每个输入都有一个带有跟踪 ID 等的上下文对象(有点不直观)
谢谢!
我相信你的第二种方法最有意义。
在您的过程元素函数中,您可以捕获任何异常并记录任何失败:
import org.sfl4j.Logger;
import org.slf4j.LoggerFactory;
import ...
public class MyDoFn<ObjectWithPubsubIdA, ObjectWithPubsubIdB> {
private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);
@ProcessElement
public void processElement(ProcessContext c) {
ObjectWithPubsubIdA a = c.element();
try {
ObjectWithPubsubIdB b = // transform ObjectWithPubsubIdA ...
c.output(b);
} catch (Exception e) {
LOG.error("MyDoFn failed for message with id {} with exception {}", a.getId(), e);
}
}
}
您可以使用抽象基础 class 或某些其他特定于语言的构造来 re-use 代码,这样您就可以在所有转换中共享一个实现。