仅当某些条件为真时才使用来自 Kafka 的消息
Consume messages from Kafka only if some condition true
我们有特定的主题,只有在条件consumeEnabled=true的情况下才需要消费消息。
所以,它应该像这样工作:
- 如果应用程序正在启动并且 consumeEnabled=true,则分配
分区到消费者并使用来自主题的消息。
- 如果应用程序正在启动并且 consumeEnabled=false,则不要将分区分配给消费者,也不要使用来自主题的消息。
- 如果应用程序已经 运行 使用 consumeEnabled=false,但在 运行 时间 属性 变为 consumeEnabled=true,则在 运行 时间将分区分配给消费者并使用来自主题的消息。
应用正在消费消息,但随后consumeEnabled变为false的情况无需考虑。
请帮助定义使用 Spring Kafka and\or Kafka Java 客户端
实施决策的最佳方式
您可以将您的消费者放在一个简单的线程中,该线程切换消费者对象的轮询状态。
public class EnabledConsumer implements Runnable {
private Consumer consumer;
private boolean enabled;
public EnabledConsumer(Consumer consumer, boolean enabled) {
this.consumer = consumer;
this.enabled = enabled;
}
public void setEnabled(boolean enable) {
this.enabled = enable;
}
@Override
public void run() {
while(enabled) {
ConsumerRecords records = consumer.poll(...);
...
}
}
如果您正在使用 @KafkaListener
那么
@KafkaListener(id = "foo", ... , autoStartup="${consume.enabled}")
其中 consume.enabled
是 属性。
要在运行时 start/stop 容器,使用 KafkaListenerEndpointRegistry
bean。
registry.getListenerContainer("foo").start();
我们有特定的主题,只有在条件consumeEnabled=true的情况下才需要消费消息。 所以,它应该像这样工作:
- 如果应用程序正在启动并且 consumeEnabled=true,则分配 分区到消费者并使用来自主题的消息。
- 如果应用程序正在启动并且 consumeEnabled=false,则不要将分区分配给消费者,也不要使用来自主题的消息。
- 如果应用程序已经 运行 使用 consumeEnabled=false,但在 运行 时间 属性 变为 consumeEnabled=true,则在 运行 时间将分区分配给消费者并使用来自主题的消息。
应用正在消费消息,但随后consumeEnabled变为false的情况无需考虑。
请帮助定义使用 Spring Kafka and\or Kafka Java 客户端
实施决策的最佳方式您可以将您的消费者放在一个简单的线程中,该线程切换消费者对象的轮询状态。
public class EnabledConsumer implements Runnable {
private Consumer consumer;
private boolean enabled;
public EnabledConsumer(Consumer consumer, boolean enabled) {
this.consumer = consumer;
this.enabled = enabled;
}
public void setEnabled(boolean enable) {
this.enabled = enable;
}
@Override
public void run() {
while(enabled) {
ConsumerRecords records = consumer.poll(...);
...
}
}
如果您正在使用 @KafkaListener
那么
@KafkaListener(id = "foo", ... , autoStartup="${consume.enabled}")
其中 consume.enabled
是 属性。
要在运行时 start/stop 容器,使用 KafkaListenerEndpointRegistry
bean。
registry.getListenerContainer("foo").start();