发生异常时如何禁用记录 Kafka 批处理中的所有消息?
How to disable logging all messages in a Kafka batch in case of an exception?
在批处理中使用 @KafkaListener
时,错误处理程序会记录完整批处理的内容(所有消息)以防出现异常。
我怎样才能让它不那么冗长?我想避免使用所有消息向日志文件发送垃圾邮件,而只看到实际的异常。
这是我的消费者目前的样子的一个最小示例:
@Component
class TestConsumer {
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 10000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
}
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
)
fun batchListen(values: List<ConsumerRecord<String, String>>) {
// Something that might throw an exception in rare cases.
}
}
您使用的是什么版本?
此容器 属性 是在 2.2.14 中添加的。
/**
* Set to false to log {@code record.toString()} in log messages instead
* of {@code topic-partition@offset}.
* @param onlyLogRecordMetadata false to log the entire record.
* @since 2.2.14
*/
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
}
它一直是 true by default since version 2.7(这就是为什么 javadoc 现在这样读)。
这是以前的 javadoc:
/**
* Set to true to only log {@code topic-partition@offset} in log messages instead
* of {@code record.toString()}.
* @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
* @since 2.2.14
*/
此外,从 2.5 版开始,您可以在错误处理程序上设置日志级别:
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
Assert.notNull(logLevel, "'logLevel' cannot be null");
this.logLevel = logLevel;
}
在批处理中使用 @KafkaListener
时,错误处理程序会记录完整批处理的内容(所有消息)以防出现异常。
我怎样才能让它不那么冗长?我想避免使用所有消息向日志文件发送垃圾邮件,而只看到实际的异常。
这是我的消费者目前的样子的一个最小示例:
@Component
class TestConsumer {
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 10000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
}
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
)
fun batchListen(values: List<ConsumerRecord<String, String>>) {
// Something that might throw an exception in rare cases.
}
}
您使用的是什么版本?
此容器 属性 是在 2.2.14 中添加的。
/**
* Set to false to log {@code record.toString()} in log messages instead
* of {@code topic-partition@offset}.
* @param onlyLogRecordMetadata false to log the entire record.
* @since 2.2.14
*/
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
}
它一直是 true by default since version 2.7(这就是为什么 javadoc 现在这样读)。
这是以前的 javadoc:
/**
* Set to true to only log {@code topic-partition@offset} in log messages instead
* of {@code record.toString()}.
* @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
* @since 2.2.14
*/
此外,从 2.5 版开始,您可以在错误处理程序上设置日志级别:
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
Assert.notNull(logLevel, "'logLevel' cannot be null");
this.logLevel = logLevel;
}