暂停主kafka流的消息消费,从其他kafka主题开始
Pause message consumption from main kafka stream and start from other kafka topic
我正在使用@StreamListener(spring 云流)来消费来自主题(输入通道)的消息,进行一些处理并保存到一些缓存中或数据库。
我的要求是,如果 DB 在处理消费消息时出现故障,我想暂停主要消费者(输入通道),并从另一个主题(INPUT56-CHANNEL)开始消费,并且一旦它消费来自 INPUT56-CHANNEL 的所有消息(没有很多),我想再次恢复主要消费者(输入通道)。
能实现吗??
@StreamListener
已弃用;您应该改用函数式编程模型。
这是一个使用该模型的示例(但相同的技术适用于已弃用的侦听器)。
spring.cloud.function.definition=input1;input2
spring.cloud.stream.bindings.input1-in-0.group=grp1
spring.cloud.stream.bindings.input2-in-0.consumer.auto-startup=false
spring.cloud.stream.bindings.input2-in-0.group=grp2
spring.cloud.stream.kafka.bindings.input2-in-0.consumer.idle-event-interval=5000
@SpringBootApplication
public class So69726610Application {
public static void main(String[] args) {
SpringApplication.run(So69726610Application.class, args);
}
boolean dbIsDown = true;
@Autowired
BindingsLifecycleController controller;
TaskExecutor exec = new SimpleAsyncTaskExecutor();
@Bean
public Consumer<String> input1() {
return str -> {
System.out.println(str);
if (this.dbIsDown) {
this.controller.changeState("input1-in-0", State.PAUSED);
this.controller.changeState("input2-in-0", State.STARTED);
throw new RuntimeException("Paused");
}
};
}
@Bean
public Consumer<String> input2() {
return System.out::println;
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
System.out.println(event);
// assumes concurrency = 1 (default)
if (event.getListenerId().contains("input2-in-0")) {
this.controller.changeState("input1-in-0", State.RESUMED);
this.exec.execute(() -> this.controller.changeState("input2-in-0", State.STOPPED));
}
}
}
我正在使用@StreamListener(spring 云流)来消费来自主题(输入通道)的消息,进行一些处理并保存到一些缓存中或数据库。
我的要求是,如果 DB 在处理消费消息时出现故障,我想暂停主要消费者(输入通道),并从另一个主题(INPUT56-CHANNEL)开始消费,并且一旦它消费来自 INPUT56-CHANNEL 的所有消息(没有很多),我想再次恢复主要消费者(输入通道)。
能实现吗??
@StreamListener
已弃用;您应该改用函数式编程模型。
这是一个使用该模型的示例(但相同的技术适用于已弃用的侦听器)。
spring.cloud.function.definition=input1;input2
spring.cloud.stream.bindings.input1-in-0.group=grp1
spring.cloud.stream.bindings.input2-in-0.consumer.auto-startup=false
spring.cloud.stream.bindings.input2-in-0.group=grp2
spring.cloud.stream.kafka.bindings.input2-in-0.consumer.idle-event-interval=5000
@SpringBootApplication
public class So69726610Application {
public static void main(String[] args) {
SpringApplication.run(So69726610Application.class, args);
}
boolean dbIsDown = true;
@Autowired
BindingsLifecycleController controller;
TaskExecutor exec = new SimpleAsyncTaskExecutor();
@Bean
public Consumer<String> input1() {
return str -> {
System.out.println(str);
if (this.dbIsDown) {
this.controller.changeState("input1-in-0", State.PAUSED);
this.controller.changeState("input2-in-0", State.STARTED);
throw new RuntimeException("Paused");
}
};
}
@Bean
public Consumer<String> input2() {
return System.out::println;
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
System.out.println(event);
// assumes concurrency = 1 (default)
if (event.getListenerId().contains("input2-in-0")) {
this.controller.changeState("input1-in-0", State.RESUMED);
this.exec.execute(() -> this.controller.changeState("input2-in-0", State.STOPPED));
}
}
}