Kafka ConsumerInterceptor 所需的配置

required configurations for Kafka ConsumerInterceptor

我是卡夫卡的新手。我已经设置了我的环境来生成和使用记录。但是,我的目标是在将记录发送给目标消费者之前拦截记录并修改它们的值。 现在,为了确保环境设置良好以拦截记录,我正在编写一个简单的 ConsumerInterceptor,它将拦截记录并打印它们的值。 我的 configure() 方法应该实现什么来启用我的 consumerInterceptor?我应该 add/modify 还有什么其他配置?

public class SimpleConsumerInterceptors<K, V> implements ConsumerInterceptor<K, V>{
    private String clientId;

    public void configure(final Map<String, ?> configs) {

// What configurations required to enable my consumerInterceptor?
        
    }

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (TopicPartition partition : records.partitions()) {
            String topic = partition.topic();
            List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
            for (ConsumerRecord<K, V> record : recordsInPartition) {
                System.out.println("onConsume:");
                System.out.println(record.value());
            }
        }
                 
        return interceptRecords;
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println("onCommit")
    }

    @Override
    public void close() {
        System.out.println("close")
        this.close();
    }
}

您只需将 class 名称添加到 interceptor.classes consumer propertyConsumerConfig.INTERCEPTOR_CLASSES_CONFIG

如果你想在Kafka创建实例后配置拦截器,你只需要实现configure()

传递给方法的地图是消费者属性的地图。

那么,假设您想将日志记录设为可选;在消费者配置中添加my.interceptor.logging.enabled=true,在configure()方法中使用它来配置是否记录记录。