Kafka Streams K-Table 大小监控
Kafka Streams K-Table size monitoring
我有一个流拓扑,它从一个主题中消费并运行一个聚合并构建一个 KTable,该 KTable 被具体化到 rocksDB 中。
我有另一个应用程序每天使用同一主题的所有事件,并为满足某些特定条件的事件发送墓碑消息(即不再需要它们)。
聚合处理这个问题并从状态存储中删除,但我正在监视状态存储的大小或更改日志主题——任何真正告诉我 ktable 大小的东西。
我已经公开了 JMX 指标,但似乎没有任何东西可以满足我的需要。我可以看到 "puts" 进入 rocksDB 的总数,但看不到键的总数。
我的应用 spring 启动,我想通过 prometheus 公开指标。
有没有人解决了这个问题或有什么有用的想法?
您可以使用此 KeyValueStore#approximateNumEntries()
访问 KTable 的底层状态存储来获取每个分区中的近似计数,然后将此计数导出到 prometheus(每个分区有一个计数)。
要访问底层状态存储,您可以使用低级处理器 API 通过每个 StreamTask(对应于一个分区)中的每个 ProcessorContext 来访问 KeyValueStore
。只需将 KStream#transformValues()
添加到您的拓扑:
kStream
...
.transformValues(ExtractCountTransformer::new, "your_ktable_name")
...
然后在 ExtractCountTransformer 中将计数提取到普罗米修斯:
@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {
private KeyValueStore<String, String> yourKTableKvStore;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
}
@Override
public String transform(String readOnlyKey, String value) {
//extract count to prometheus
log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
yourKTableKvStore.approximateNumEntries();
return value;
}
@Override
public void close() {
}
}
我有一个流拓扑,它从一个主题中消费并运行一个聚合并构建一个 KTable,该 KTable 被具体化到 rocksDB 中。
我有另一个应用程序每天使用同一主题的所有事件,并为满足某些特定条件的事件发送墓碑消息(即不再需要它们)。 聚合处理这个问题并从状态存储中删除,但我正在监视状态存储的大小或更改日志主题——任何真正告诉我 ktable 大小的东西。
我已经公开了 JMX 指标,但似乎没有任何东西可以满足我的需要。我可以看到 "puts" 进入 rocksDB 的总数,但看不到键的总数。 我的应用 spring 启动,我想通过 prometheus 公开指标。
有没有人解决了这个问题或有什么有用的想法?
您可以使用此 KeyValueStore#approximateNumEntries()
访问 KTable 的底层状态存储来获取每个分区中的近似计数,然后将此计数导出到 prometheus(每个分区有一个计数)。
要访问底层状态存储,您可以使用低级处理器 API 通过每个 StreamTask(对应于一个分区)中的每个 ProcessorContext 来访问 KeyValueStore
。只需将 KStream#transformValues()
添加到您的拓扑:
kStream
...
.transformValues(ExtractCountTransformer::new, "your_ktable_name")
...
然后在 ExtractCountTransformer 中将计数提取到普罗米修斯:
@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {
private KeyValueStore<String, String> yourKTableKvStore;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
}
@Override
public String transform(String readOnlyKey, String value) {
//extract count to prometheus
log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
yourKTableKvStore.approximateNumEntries();
return value;
}
@Override
public void close() {
}
}