仅当某些条件为真时才使用来自 Kafka 的消息

Consume messages from Kafka only if some condition true

我们有特定的主题,只有在条件consumeEnabled=true的情况下才需要消费消息。 所以,它应该像这样工作:

  1. 如果应用程序正在启动并且 consumeEnabled=true,则分配 分区到消费者并使用来自主题的消息。
  2. 如果应用程序正在启动并且 consumeEnabled=false,则不要将分区分配给消费者,也不要使用来自主题的消息。
  3. 如果应用程序已经 运行 使用 consumeEnabled=false,但在 运行 时间 属性 变为 consumeEnabled=true,则在 运行 时间将分区分配给消费者并使用来自主题的消息。

应用正在消费消息,但随后consumeEnabled变为false的情况无需考虑。

请帮助定义使用 Spring Kafka and\or Kafka Java 客户端

实施决策的最佳方式

您可以将您的消费者放在一个简单的线程中,该线程切换消费者对象的轮询状态。

public class EnabledConsumer implements Runnable {

    private Consumer consumer;
    private boolean enabled;

    public EnabledConsumer(Consumer consumer, boolean enabled) {
        this.consumer = consumer;
        this.enabled = enabled;
    }

    public void setEnabled(boolean enable) {
        this.enabled = enable;
    }

    @Override
    public void run() {
        while(enabled) {
            ConsumerRecords records = consumer.poll(...);
            ...
        }

}

如果您正在使用 @KafkaListener 那么

@KafkaListener(id = "foo", ... , autoStartup="${consume.enabled}")

其中 consume.enabled 是 属性。

要在运行时 start/stop 容器,使用 KafkaListenerEndpointRegistry bean。

registry.getListenerContainer("foo").start();