Spring Kafka 在侦听器级别按 header 过滤消息

Spring Kafka filtering messages by header on a listener level

我有一个遗留的 kafka 主题,其中发送了不同类型的消息,这些消息是用自定义 header 编写的,带有特定的键来区分记录。 在给定的应用程序上,我有多个方法,我想用自定义注释来注释,比如 @CustomKafkaListener(discriminator="xxx"),它将用 @KafkaListener.

来注释

我如何过滤消息,以便如果我有 2 条消息发送到中心主题,用鉴别符“xxx”注释的方法只会读取这些消息,而用鉴别符“yyy”注释的方法只会读取“ yyy”个。

例如

    @CustomKafkaListener(discriminator="com.mypackage.subpackage", topic="central-topic")
    public void consumerMessagesXXX(ConsumerRecord r){
    // reads only XXXX messages skip all others
    }
    
    
    @CustomKafkaListener(discriminator="com.mypackage", topic="central-topic")
    public void consumerMessagesYYY(ConsumerRecord r){
    // reads only YYY messages skip all others
    }

我希望过滤器能够读取目标侦听器的鉴别器 属性 并动态决定消息是否应由该侦听器通过反射或提供给过滤器的某些元数据进行处理例如

  public boolean filter(ConsumerRecord consumerRecord, Consumer<Long, Event> consumer) {
  var discriminatorPattern = consumer.getMetadataXXX();//retrieve discriminator information either by reflection or metadata
return    
   discriminatorPattern .matches(consumerRecord().lastHeader("discriminator").value());
}

创建自定义注释是一个非常高级的主题;您需要将注释 bean post 处理器子类化,并想出一些机制来通过添加过滤策略 bean 来自定义端点。

欢迎在 GitHub https://github.com/spring-projects/spring-kafka/issues

上提出新功能请求

我们可以添加一个新的 属性 来传递来自 @KafkaListener.

RecordFilterStrategy bean 的 bean 名称

编辑

我看你开了an issue;谢谢。

这是稍后添加过滤器的解决方法...

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300", autoStartup = "false")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300", autoStartup = "false")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> xxx() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "xxx".getBytes());
        };
    }

    @Bean
    RecordFilterStrategy<String, String> yyy() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "yyy".getBytes());
        };
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> xxx, RecordFilterStrategy<String, String> yyy,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);

            updateListener("xxx", xxx, registry);
            updateListener("yyy", yyy, registry);
            registry.start();
        };
    }

    private void updateListener(String id, RecordFilterStrategy<String, String> filter,
            KafkaListenerEndpointRegistry registry) {

        MessageListener listener = (MessageListener) registry.getListenerContainer(id).getContainerProperties()
                .getMessageListener();
        registry.getListenerContainer(id).getContainerProperties()
                .setMessageListener(new FilteringMessageListenerAdapter<>(listener, filter));
    }

}
1:test.to.xxx
2:test.to.yyy

EDIT2

此版本使用单个过滤器并使用消费者的 group.id 作为鉴别器:

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> discriminator(
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        RecordFilterStrategy<String, String> filter = rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), KafkaUtils.getConsumerGroupId().getBytes());
        };
        factory.setRecordFilterStrategy(filter);
        return filter;
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> discriminator,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);
        };
    }

}
1:test.to.xxx
2:test.to.yyy