是否可以按保留时间过滤 Apache Kafka 消息?
Is it possible to filter Apache Kafka messages by retention time?
从抽象的角度来看,Apache Kafka 将数据存储在主题中。消费者可以读取此数据。
我想要一个(监视器)-consumer,它可以 greps 具有特定年龄的数据。监视器应向子系统发送警告,指出记录仍未被读取,如果达到保留时间将被 Kafka 丢弃。
一直没找到合适的方法。
您可以使用 KafkaConsumer.offsetsForTimes()
将消息映射到日期。
例如,如果您用昨天的日期调用它并且它 returns 偏移量 X,那么偏移量小于 X 的任何消息都比昨天早。
然后您的逻辑可以根据消费者的当前位置计算出您是否面临丢弃未处理记录的风险。
请注意,目前正在讨论一个 KIP 来公开指标来跟踪:https://cwiki.apache.org/confluence/display/KAFKA/KIP-223+-+Add+per-topic+min+lead+and+per-partition+lead+metrics+to+KafkaConsumer
从抽象的角度来看,Apache Kafka 将数据存储在主题中。消费者可以读取此数据。
我想要一个(监视器)-consumer,它可以 greps 具有特定年龄的数据。监视器应向子系统发送警告,指出记录仍未被读取,如果达到保留时间将被 Kafka 丢弃。
一直没找到合适的方法。
您可以使用 KafkaConsumer.offsetsForTimes()
将消息映射到日期。
例如,如果您用昨天的日期调用它并且它 returns 偏移量 X,那么偏移量小于 X 的任何消息都比昨天早。
然后您的逻辑可以根据消费者的当前位置计算出您是否面临丢弃未处理记录的风险。
请注意,目前正在讨论一个 KIP 来公开指标来跟踪:https://cwiki.apache.org/confluence/display/KAFKA/KIP-223+-+Add+per-topic+min+lead+and+per-partition+lead+metrics+to+KafkaConsumer