spring kafka,即使为空也获取主题大小
spring kafka, get topic size even when empty
我想在成功读取某个主题的所有条目后解锁 CountDownLatch
。我目前的方法是 Spring Kafka:
private final CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(
topicPartitions = @TopicPartition(topic = KAFKA_TOPIC, partitions = {"0"}),
containerFactory = "factory")
public void consume(
List<Payload> payload,
Consumer<?, ?> consumer,
@Header(KafkaHeaders.OFFSET) Long offset) {
// handle all entries:
// payload.forEach(eventHandler::handle);
// offset of last committed message + 1
int endOffset = consumer.endOffsets(List.of(TOPIC_PART)).get(TOPIC_PART);
// when we have processed the last entry, we unlock the latch.
if (offset + payload.size() >= endOffset) {
latch.countDown();
}
}
问题是当 Kafka 主题为空时 latch
永远不会解锁,因为该方法永远不会 运行。如何在主题为空时解锁闩锁?
见https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#idle-containers and https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#event-consumption
The following Spring application events are published by listener containers and their consumers:
...
ListenerContainerIdleEvent
: published when no messages have been received in idleInterval
(if configured).
...
The ListenerContainerIdleEvent has the following properties:
source: The listener container instance that published the event.
container: The listener container or the parent listener container, if the source container is a child.
id: The listener ID (or container bean name).
idleTime: The time the container had been idle when the event was published.
topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.
consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.
paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
我想在成功读取某个主题的所有条目后解锁 CountDownLatch
。我目前的方法是 Spring Kafka:
private final CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(
topicPartitions = @TopicPartition(topic = KAFKA_TOPIC, partitions = {"0"}),
containerFactory = "factory")
public void consume(
List<Payload> payload,
Consumer<?, ?> consumer,
@Header(KafkaHeaders.OFFSET) Long offset) {
// handle all entries:
// payload.forEach(eventHandler::handle);
// offset of last committed message + 1
int endOffset = consumer.endOffsets(List.of(TOPIC_PART)).get(TOPIC_PART);
// when we have processed the last entry, we unlock the latch.
if (offset + payload.size() >= endOffset) {
latch.countDown();
}
}
问题是当 Kafka 主题为空时 latch
永远不会解锁,因为该方法永远不会 运行。如何在主题为空时解锁闩锁?
见https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#idle-containers and https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#event-consumption
The following Spring application events are published by listener containers and their consumers:
...
ListenerContainerIdleEvent
: published when no messages have been received inidleInterval
(if configured).
...
The ListenerContainerIdleEvent has the following properties:
source: The listener container instance that published the event.
container: The listener container or the parent listener container, if the source container is a child.
id: The listener ID (or container bean name).
idleTime: The time the container had been idle when the event was published.
topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.
consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.
paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}