Flink作业执行过程中如何记录未捕获的异常
How to log uncaught exceptions during Flink job execution
我正在尝试将 Sentry 附加到我们的 Flink 集群以跟踪作业执行情况。 Sentry 充当记录器,它捕获消息并将它们发送到中央服务器。默认情况下,它会捕获所有级别为 WARN 或更高级别的消息。
为了让 Sentry 捕获所有问题,我需要在操作员引发未捕获的异常时写入 WARN 或 ERROR 日志消息。如果重启策略失败,Execution Environment 中的 execute()
方法将抛出最终异常,我可以适当记录。但是我还没有找到一种方法来记录导致作业重新启动的异常。 Flink 将它们记录为 INFO 消息,但这使得它们很难从其余消息中过滤出来。
在 Flink 作业中处理未捕获异常的合适方法是什么?
从 Flink 的角度来看,用户代码错误是意料之中的,因此,Flink 不会在 WARN
或 ERROR
上记录它们。 WARN
和 ERROR
保留用于记录语句,表明 Flink 本身有问题。
捕获任务失败的最佳选择是 grep for <TASK_NAME> switched from RUNNING to FAILED
。这样,只要 <TASK_NAME>
失败,您就会收到通知。但是请注意,不能保证日志记录语句永远不会改变。
免责声明:我(还)不是 flink 的专家
可以找到异常和日志。
一般日志
在Flink应用中配置Flink logging (do not forget to add logback.xml in the resource folder). Do not forget to set necessary log settings
在此之后,我能够看到包括 info
级别的日志消息。要记录“内部”内容 jdbc 考虑:
JdbcSink.sink(
"insert into my)table (id) values (?);",
(statement, event) -> {
Logger logger = LoggerFactory.getLogger(new Object(){}.getClass().getEnclosingClass());
logger.info("LoggingWorks!!!!");
try {
statement.setString(1, UUID.randomUUID().toString());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
},
JdbcExecutionOptions.builder()
// omited
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
// omited
.build()
);
LoggerFactory.getLogger(new Object(){}.getClass().getEnclosingClass());
- 不是一个完美的解决方案,但增加了调试的可能性。
异常日志
我无法提供通用的解决方案,但在 aws 中有一个 Apache Flink dashboard 按钮,您可以在其中查看 aws 中部署的应用程序中的所有 运行 Flink 应用程序。只需单击列表中的作业名称,您将能够看到错误的异常页面。
是的,这是针对 AWS 的,但我相信本地集群中存在相同的仪表板。
我正在尝试将 Sentry 附加到我们的 Flink 集群以跟踪作业执行情况。 Sentry 充当记录器,它捕获消息并将它们发送到中央服务器。默认情况下,它会捕获所有级别为 WARN 或更高级别的消息。
为了让 Sentry 捕获所有问题,我需要在操作员引发未捕获的异常时写入 WARN 或 ERROR 日志消息。如果重启策略失败,Execution Environment 中的 execute()
方法将抛出最终异常,我可以适当记录。但是我还没有找到一种方法来记录导致作业重新启动的异常。 Flink 将它们记录为 INFO 消息,但这使得它们很难从其余消息中过滤出来。
在 Flink 作业中处理未捕获异常的合适方法是什么?
从 Flink 的角度来看,用户代码错误是意料之中的,因此,Flink 不会在 WARN
或 ERROR
上记录它们。 WARN
和 ERROR
保留用于记录语句,表明 Flink 本身有问题。
捕获任务失败的最佳选择是 grep for <TASK_NAME> switched from RUNNING to FAILED
。这样,只要 <TASK_NAME>
失败,您就会收到通知。但是请注意,不能保证日志记录语句永远不会改变。
免责声明:我(还)不是 flink 的专家
可以找到异常和日志。
一般日志
在Flink应用中配置Flink logging (do not forget to add logback.xml in the resource folder). Do not forget to set necessary log settings
在此之后,我能够看到包括 info
级别的日志消息。要记录“内部”内容 jdbc 考虑:
JdbcSink.sink(
"insert into my)table (id) values (?);",
(statement, event) -> {
Logger logger = LoggerFactory.getLogger(new Object(){}.getClass().getEnclosingClass());
logger.info("LoggingWorks!!!!");
try {
statement.setString(1, UUID.randomUUID().toString());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
},
JdbcExecutionOptions.builder()
// omited
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
// omited
.build()
);
LoggerFactory.getLogger(new Object(){}.getClass().getEnclosingClass());
- 不是一个完美的解决方案,但增加了调试的可能性。
异常日志
我无法提供通用的解决方案,但在 aws 中有一个 Apache Flink dashboard 按钮,您可以在其中查看 aws 中部署的应用程序中的所有 运行 Flink 应用程序。只需单击列表中的作业名称,您将能够看到错误的异常页面。
是的,这是针对 AWS 的,但我相信本地集群中存在相同的仪表板。