Flink作业执行过程中如何记录未捕获的异常

How to log uncaught exceptions during Flink job execution

我正在尝试将 Sentry 附加到我们的 Flink 集群以跟踪作业执行情况。 Sentry 充当记录器,它捕获消息并将它们发送到中央服务器。默认情况下,它会捕获所有级别为 WARN 或更高级别的消息。

为了让 Sentry 捕获所有问题,我需要在操作员引发未捕获的异常时写入 WARN 或 ERROR 日志消息。如果重启策略失败,Execution Environment 中的 execute() 方法将抛出最终异常,我可以适当记录。但是我还没有找到一种方法来记录导致作业重新启动的异常。 Flink 将它们记录为 INFO 消息,但这使得它们很难从其余消息中过滤出来。

在 Flink 作业中处理未捕获异常的合适方法是什么?

从 Flink 的角度来看,用户代码错误是意料之中的,因此,Flink 不会在 WARNERROR 上记录它们。 WARNERROR 保留用于记录语句,表明 Flink 本身有问题。

捕获任务失败的最佳选择是 grep for <TASK_NAME> switched from RUNNING to FAILED。这样,只要 <TASK_NAME> 失败,您就会收到通知。但是请注意,不能保证日志记录语句永远不会改变。

免责声明:我(还)不是 flink 的专家

可以找到异常和日志。

一般日志

在Flink应用中配置Flink logging (do not forget to add logback.xml in the resource folder). Do not forget to set necessary log settings

在此之后,我能够看到包括 info 级别的日志消息。要记录“内部”内容 jdbc 考虑:

JdbcSink.sink(
    "insert into my)table (id) values (?);",
    (statement, event) -> {
      Logger logger = LoggerFactory.getLogger(new Object(){}.getClass().getEnclosingClass());
      logger.info("LoggingWorks!!!!");
      try {
        statement.setString(1, UUID.randomUUID().toString());
      } catch (Exception ex) {
        throw new RuntimeException(ex);
      }
    },
    JdbcExecutionOptions.builder()        
        // omited
        .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        // omited
        .build()
);

LoggerFactory.getLogger(new Object(){}.getClass().getEnclosingClass()); - 不是一个完美的解决方案,但增加了调试的可能性。

异常日志

我无法提供通用的解决方案,但在 aws 中有一个 Apache Flink dashboard 按钮,您可以在其中查看 aws 中部署的应用程序中的所有 运行 Flink 应用程序。只需单击列表中的作业名称,您将能够看到错误的异常页面。

是的,这是针对 AWS 的,但我相信本地集群中存在相同的仪表板。