根据某些条件调用kafka消费者

Invoking a kafka consumer based on some condition

假设有一个class A正在消费来自kafka消费者C1的数据并且基于来自消费者C1的数据,我需要从消费者C2开始消费。

有办法吗?

这是一种方法:

@SpringBootApplication
public class So62160012Application {

    public static void main(String[] args) {
        SpringApplication.run(So62160012Application.class, args);
    }

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private KafkaTemplate<String, String> template;

    @KafkaListener(id = "so62160012-1", topics = "so62160012-1")
    public void listen1(String in) {
        System.out.println("one: " + in);
        template.send("so62160012-2", in);
        if (in.contains("startC2")) {
            this.registry.getListenerContainer("so62160012-2").start();
        }
    }

    @KafkaListener(id = "so62160012-2", topics = "so62160012-2", autoStartup = "false")
    public void listen2(String in) {
        System.out.println("two: " + in);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so62160012-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so62160012-2").partitions(1).replicas(1).build();
    }


    @Bean
    public ApplicationRunner runner() {
        return args -> {
            IntStream.range(0, 10)
                    .map(i -> {
                        try {
                            Thread.sleep(2000);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        return i;
                    })
                    .forEach(i -> this.template.send("so62160012-1",
                            "foo" + i + (i == 5 ? "startC2" : "")));
        };
    }

}

希望不言自明。