Spring Kafka 在容器的 CommonErrorHandler 上设置日志级别
Spring Kafka set logging level on container's CommonErrorHandler
我有一个 Kafka 消费者应用程序,目前有一个自定义的 CommonErrorHandler 来处理抛出的 @KafkaListener
异常。我有一个自定义的 FixedBackOff 策略,它将重试最多 3 次,然后将记录发布到 DLQ 主题,但每次重试时都会打印错误的整个堆栈跟踪和我想知道我是否可以将其抑制到 DEBUG 级别,这样它就不会使控制台输出混乱?这是我目前拥有的 (Kotlin):
factory.setCommonErrorHandler(
DefaultErrorHandler({ record, exception ->
val thrownException = exception.cause ?: exception.localizedMessage
log.error(
"Kafka record offset=${record.offset()} is being sent to the DLQ due to=" +
"$thrownException"
)
val producerRecord = ProducerRecord<String, String>(dlqTopic, record.value().toString())
producerRecord.headers().add("dlq-failure-reason", thrownException.toString().toByteArray())
kafkaTemplate?.send(producerRecord)
}, FixedBackOff(0L, 2L))
我看过这里:https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers,它提到了这一点,“所有框架错误处理程序都扩展了 KafkaExceptionLogLevelAware
,它允许您控制记录这些异常的级别。”但是,我尝试通过扩展 class 并显式设置它的日志级别来设置日志级别,但无济于事。我错过了什么吗?
你不需要继承它,只需设置 属性.
KafkaExceptionLogLevelAware
所做的只是在错误处理程序抛出的 KafkaException
上设置日志级别;它是容器,然后通过异常的 selfLog
方法记录它。
您正在 error
级别(恢复期间)自行记录。
编辑
适合我...
@SpringBootApplication
public class So71373630Application {
public static void main(String[] args) {
SpringApplication.run(So71373630Application.class, args).close();
}
@Bean
DefaultErrorHandler eh() {
DefaultErrorHandler eh = new DefaultErrorHandler((r, ex) -> System.out.println("Failed:" + r.value()),
new FixedBackOff(0L, 3L));
eh.setLogLevel(Level.DEBUG);
return eh;
}
@KafkaListener(id = "so71373630", topics = "so71373630")
void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so71373630").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so71373630", "foo");
Thread.sleep(5000);
};
}
}
2022-03-07 10:10:35.460 INFO 31409 --- [o71373630-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so71373630: partitions assigned: [so71373630-0]
foo
2022-03-07 10:10:35.492 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
2022-03-07 10:10:35.987 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
2022-03-07 10:10:36.490 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
Failed:foo
我有一个 Kafka 消费者应用程序,目前有一个自定义的 CommonErrorHandler 来处理抛出的 @KafkaListener
异常。我有一个自定义的 FixedBackOff 策略,它将重试最多 3 次,然后将记录发布到 DLQ 主题,但每次重试时都会打印错误的整个堆栈跟踪和我想知道我是否可以将其抑制到 DEBUG 级别,这样它就不会使控制台输出混乱?这是我目前拥有的 (Kotlin):
factory.setCommonErrorHandler(
DefaultErrorHandler({ record, exception ->
val thrownException = exception.cause ?: exception.localizedMessage
log.error(
"Kafka record offset=${record.offset()} is being sent to the DLQ due to=" +
"$thrownException"
)
val producerRecord = ProducerRecord<String, String>(dlqTopic, record.value().toString())
producerRecord.headers().add("dlq-failure-reason", thrownException.toString().toByteArray())
kafkaTemplate?.send(producerRecord)
}, FixedBackOff(0L, 2L))
我看过这里:https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers,它提到了这一点,“所有框架错误处理程序都扩展了 KafkaExceptionLogLevelAware
,它允许您控制记录这些异常的级别。”但是,我尝试通过扩展 class 并显式设置它的日志级别来设置日志级别,但无济于事。我错过了什么吗?
你不需要继承它,只需设置 属性.
KafkaExceptionLogLevelAware
所做的只是在错误处理程序抛出的 KafkaException
上设置日志级别;它是容器,然后通过异常的 selfLog
方法记录它。
您正在 error
级别(恢复期间)自行记录。
编辑
适合我...
@SpringBootApplication
public class So71373630Application {
public static void main(String[] args) {
SpringApplication.run(So71373630Application.class, args).close();
}
@Bean
DefaultErrorHandler eh() {
DefaultErrorHandler eh = new DefaultErrorHandler((r, ex) -> System.out.println("Failed:" + r.value()),
new FixedBackOff(0L, 3L));
eh.setLogLevel(Level.DEBUG);
return eh;
}
@KafkaListener(id = "so71373630", topics = "so71373630")
void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so71373630").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so71373630", "foo");
Thread.sleep(5000);
};
}
}
2022-03-07 10:10:35.460 INFO 31409 --- [o71373630-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so71373630: partitions assigned: [so71373630-0]
foo
2022-03-07 10:10:35.492 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
2022-03-07 10:10:35.987 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
2022-03-07 10:10:36.490 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
Failed:foo