根据某些条件调用kafka消费者
Invoking a kafka consumer based on some condition
假设有一个class A正在消费来自kafka消费者C1的数据并且基于来自消费者C1的数据,我需要从消费者C2开始消费。
有办法吗?
这是一种方法:
@SpringBootApplication
public class So62160012Application {
public static void main(String[] args) {
SpringApplication.run(So62160012Application.class, args);
}
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, String> template;
@KafkaListener(id = "so62160012-1", topics = "so62160012-1")
public void listen1(String in) {
System.out.println("one: " + in);
template.send("so62160012-2", in);
if (in.contains("startC2")) {
this.registry.getListenerContainer("so62160012-2").start();
}
}
@KafkaListener(id = "so62160012-2", topics = "so62160012-2", autoStartup = "false")
public void listen2(String in) {
System.out.println("two: " + in);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so62160012-1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so62160012-2").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner() {
return args -> {
IntStream.range(0, 10)
.map(i -> {
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return i;
})
.forEach(i -> this.template.send("so62160012-1",
"foo" + i + (i == 5 ? "startC2" : "")));
};
}
}
希望不言自明。
假设有一个class A正在消费来自kafka消费者C1的数据并且基于来自消费者C1的数据,我需要从消费者C2开始消费。
有办法吗?
这是一种方法:
@SpringBootApplication
public class So62160012Application {
public static void main(String[] args) {
SpringApplication.run(So62160012Application.class, args);
}
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, String> template;
@KafkaListener(id = "so62160012-1", topics = "so62160012-1")
public void listen1(String in) {
System.out.println("one: " + in);
template.send("so62160012-2", in);
if (in.contains("startC2")) {
this.registry.getListenerContainer("so62160012-2").start();
}
}
@KafkaListener(id = "so62160012-2", topics = "so62160012-2", autoStartup = "false")
public void listen2(String in) {
System.out.println("two: " + in);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so62160012-1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so62160012-2").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner() {
return args -> {
IntStream.range(0, 10)
.map(i -> {
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return i;
})
.forEach(i -> this.template.send("so62160012-1",
"foo" + i + (i == 5 ? "startC2" : "")));
};
}
}
希望不言自明。