spring kafka,即使为空也获取主题大小

spring kafka, get topic size even when empty

我想在成功读取某个主题的所有条目后解锁 CountDownLatch。我目前的方法是 Spring Kafka:

private final CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(
  topicPartitions = @TopicPartition(topic = KAFKA_TOPIC, partitions = {"0"}),
  containerFactory = "factory")
public void consume(
    List<Payload> payload,
    Consumer<?, ?> consumer,
    @Header(KafkaHeaders.OFFSET) Long offset) {
        // handle all entries:
        // payload.forEach(eventHandler::handle);

        // offset of last committed message + 1
        int endOffset = consumer.endOffsets(List.of(TOPIC_PART)).get(TOPIC_PART);

        // when we have processed the last entry, we unlock the latch.
        if (offset + payload.size() >= endOffset) {
            latch.countDown();
        }
}

问题是当 Kafka 主题为空时 latch 永远不会解锁,因为该方法永远不会 运行。如何在主题为空时解锁闩锁?

https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#idle-containers and https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#event-consumption

The following Spring application events are published by listener containers and their consumers:

...

ListenerContainerIdleEvent: published when no messages have been received in idleInterval (if configured).

...

The ListenerContainerIdleEvent has the following properties:

source: The listener container instance that published the event.

container: The listener container or the parent listener container, if the source container is a child.

id: The listener ID (or container bean name).

idleTime: The time the container had been idle when the event was published.

topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.

consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}