不同的消费者有各自的过滤策略

Different consumers with the respective filter strategies

我们可以在 spring 引导应用程序中让 2 个不同的消费者使用各自的过滤策略吗?如果是这样,我们如何将过滤策略挂钩到各自的消费者? (boot-2.3.8) 注意:两者属性不同

@KafkaListener(topics = "topic1", groupId = "group-id1",
          properties = {
                  "max.poll.interval.ms: ${max.poll.interval.ms}",
                  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
                  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
                  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
                  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
                  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
          }, autoStartup = "true")
  public void onMessage(
          @Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
       processMessageAndAcknowledge(consumerRecord, acknowledgment);
  }
  
  
   @Bean
    public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic1() {
        return rec -> true;
    }
    
    @KafkaListener(topics = "topic2", groupId = "group-id2",
          properties = {
                  "max.poll.interval.ms: ${max.poll.interval.ms}",
                  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
                  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
                  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
                  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
                  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
          }, autoStartup = "true")
  public void onMessage(
          @Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
       processMessageAndAcknowledge(consumerRecord, acknowledgment);
  }
  
  @Bean
    public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic2() {
        return rec -> true;
    }

目前无法在侦听器级别覆盖过滤策略;您将不得不创建两个容器工厂。欢迎在 GitHub 问题中提出新功能请求。 https://github.com/spring-projects/spring-kafka/issues

一个快速的解决方案是编写一个包装器策略,根据 ConsumerRecord 中的主题名称委托给一个或另一个过滤器。

return rec -> rec.topic().equals("topic1") 
    ? recordFilterStrategyForTopic1.filter(rec)
    : recordFilterStrategyForTopic2.filter(rec);

编辑

这是一个完整的例子:

@SpringBootApplication
public class So67391819Application {

    public static void main(String[] args) {
        SpringApplication.run(So67391819Application.class, args);
    }

    @KafkaListener(id = "so67391819-1", topics = "so67391819-1")
    void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println("listener1 " + in + "@" + offset);
    }

    @KafkaListener(id = "so67391819-2", topics = "so67391819-2")
    void listen2(String in, @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println("listener2 " + in + "@" + offset);
    }

    @Bean
    NewTopic topic1() {
        return TopicBuilder.name("so67391819-1").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic topic2() {
        return TopicBuilder.name("so67391819-2").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> f1() {
        return rec -> rec.offset() % 2 == 0;
    }

    @Bean
    RecordFilterStrategy<String, String> f2() {
        return rec -> rec.offset() % 2 == 1;
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> {
                template.send("so67391819-1", "foo" + i);
                template.send("so67391819-2", "bar" + i);
            });
        };
    }

}

@Component
class FactoryCustomizer {

    FactoryCustomizer(ConcurrentKafkaListenerContainerFactory<String, String> factory,
            RecordFilterStrategy<String, String> f1,
            RecordFilterStrategy<String, String> f2) {

        factory.setRecordFilterStrategy(rec -> rec.topic().endsWith("-1") ? f1.filter(rec) : f2.filter(rec));
    }

}
listener1 foo1@1
listener2 bar0@0
listener1 foo3@3
listener1 foo5@5
listener2 bar2@2
listener1 foo7@7
listener2 bar4@4
listener1 foo9@9
listener2 bar6@6
listener2 bar8@8