发生异常时如何禁用记录 Kafka 批处理中的所有消息?

How to disable logging all messages in a Kafka batch in case of an exception?

在批处理中使用 @KafkaListener 时,错误处理程序会记录完整批处理的内容(所有消息)以防出现异常。

我怎样才能让它不那么冗长?我想避免使用所有消息向日志文件发送垃圾邮件,而只看到实际的异常。

这是我的消费者目前的样子的一个最小示例:

@Component
class TestConsumer {
    @Bean
    fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 10000
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
        factory.isBatchListener = true
        return factory
    }

    @KafkaListener(
        topics = ["myTopic"],
        containerFactory = "kafkaBatchListenerContainerFactory"
    )
    fun batchListen(values: List<ConsumerRecord<String, String>>) {
        // Something that might throw an exception in rare cases.
    }
}

您使用的是什么版本?

此容器 属性 是在 2.2.14 中添加的。

    /**
     * Set to false to log {@code record.toString()} in log messages instead
     * of {@code topic-partition@offset}.
     * @param onlyLogRecordMetadata false to log the entire record.
     * @since 2.2.14
     */
    public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
        this.onlyLogRecordMetadata = onlyLogRecordMetadata;
    }

它一直是 true by default since version 2.7(这就是为什么 javadoc 现在这样读)。

这是以前的 javadoc:

    /**
     * Set to true to only log {@code topic-partition@offset} in log messages instead
     * of {@code record.toString()}.
     * @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
     * @since 2.2.14
     */

此外,从 2.5 版开始,您可以在错误处理程序上设置日志级别:

    /**
     * Set the level at which the exception thrown by this handler is logged.
     * @param logLevel the level (default ERROR).
     */
    public void setLogLevel(KafkaException.Level logLevel) {
        Assert.notNull(logLevel, "'logLevel' cannot be null");
        this.logLevel = logLevel;
    }