如何在 apache beam 和数据流中设置 logback MDC?

How to set up logback MDC in apache beam and dataflow?

我们正在使用 apache beam 并希望设置 logback MDC。 logback MDC 是一个很好的资源,当你有一个请求进来并且你存储了一个 userId(在我们的例子中,它是 custId,fileId,requestId),然后每当开发人员登录时,它会神奇地将这些信息标记到开发人员日志中.开发人员不再忘记在他添加的每个日志语句中添加它。

我开始端到端集成类型测试,使用嵌入我们微服务中的 apache beam direct runner 进行测试(在生产中,微服务调用数据流)。目前,我发现在调用 expand() 方法之前 MDC 是好的。一旦调用了 processElement 方法,上下文当然就消失了,因为我在另一个线程中。

所以,首先尝试修复此部分。我应该把这个上下文放在哪里,以便我可以在这个线程的开头恢复它。

例如,如果我有一个 Executor.execute(runnable),那么我只需像这样使用该 runnable 传输上下文

    public class MDCContextRunnable implements Runnable {
    private final Map<String, String> mdcSnapshot;
    private Runnable runnable;

    public MDCContextRunnable(Runnable runnable) {
        this.runnable = runnable;
        mdcSnapshot = MDC.getCopyOfContextMap();
    }


    @Override
    public void run() {
        try {
            MDC.setContextMap(mdcSnapshot);

            runnable.run();
            
        } Catch {
            //Must log errors before mdc is cleared
            log.error("message", e);.  /// Logs error and MDC
        } finally {
            MDC.clear();
        }

    }
}

所以我基本上需要对 apache beam 做同样的事情。我需要

  1. 有一分夺取MDC
  2. 有一点恢复MDC
  3. 有必要清除 MDC 以防止它泄漏到另一个请求(以防万一我错过了似乎时不时发生的事情)

关于如何做到这一点有什么想法吗?

哦,如果框架记录任何异常时 MDC 可以存在,那就加分了!!!! (即理想情况下,框架应该为您做这件事,但 apache beam 似乎没有这样做。大多数网络框架都内置了这个)。

谢谢, 院长

根据您提供的上下文和示例,听起来您想使用 MDC 为您自己的 DoFns 自动捕获更多信息。你最好的选择是,根据你需要上下文可用的生命周期,在你的 DoFns 上使用 StartBundle/FinishBundleSetup/Teardown 方法来创建您的 MDC 上下文(请参阅 了解两者之间的区别)。重要的是,这些方法是针对 DoFn 的每个实例执行的,这意味着它们将在为执行这些 DoFns 而创建的新线程上调用。

引擎盖下

我应该解释这里发生了什么,以及这种方法与您最初的目标有何不同。 Apache Beam 的执行方式是您编写的管道在您自己的机器上执行并执行管道构造(这是所有扩展调用发生的地方)。但是,一旦构建了管道,它就会被发送到通常在单独的应用程序上执行的运行器,除非它是直接运行器,然后运行器要么直接执行您的用户代码,要么在 docker 环境中运行它。

在您最初的方法中,在执行开始之前将 MDC 成功应用于所有日志是有意义的,因为执行可能不仅发生在不同的线程中,而且还可能发生在不同的应用程序或机器中。但是,上述方法作为您的用户代码的一部分执行,因此在那里设置您的 MDC 将允许它在任何 thread/application/machine 正在执行转换的情况下运行。

请记住,每个 DoFn 都会调用这些方法,并且每个线程通常会有多个 DoFns,根据 MDC 的工作方式,您可能需要警惕这一点。