Kafka Streams 持久存储清理
Kafka Streams Persistent Store cleanup
是否需要一些明确的清理来防止每个持久存储的大小增长过大?我目前正在使用它来计算 DSL API.
中的聚合
我们遇到了类似的问题,我们只是安排了清洁商店的工作
在我们的 processor/transformer 中。
只需实施您的 isDataOld(nextValue) 就可以了。
@Override
public void init(ProcessorContext context) {
this.kvStore = (KeyValueStore<Key, Value>) this.context.getStateStore("KV_STORE_NAME");
this.context.schedule(60000, PunctuationType.STREAM_TIME, (timestamp) -> {
KeyValueIterator<Key, Value> iterator = kvStore.all();
while (iterator.hasNext()){
KeyValue<Key,Value> nextValue = iterator.next();
if isDataOld(nextValue)
kvStore.delete(nextValue.key);
}
});
}
怎么样:
final var store = Stores.persistentSessionStore(MATERIAL_STORE_NAME, Duration.ofDays(10));
return Stores.sessionStoreBuilder(store,
Serdes.Long(),
keySpecificAvroSerde
);
是否需要一些明确的清理来防止每个持久存储的大小增长过大?我目前正在使用它来计算 DSL API.
中的聚合我们遇到了类似的问题,我们只是安排了清洁商店的工作 在我们的 processor/transformer 中。 只需实施您的 isDataOld(nextValue) 就可以了。
@Override
public void init(ProcessorContext context) {
this.kvStore = (KeyValueStore<Key, Value>) this.context.getStateStore("KV_STORE_NAME");
this.context.schedule(60000, PunctuationType.STREAM_TIME, (timestamp) -> {
KeyValueIterator<Key, Value> iterator = kvStore.all();
while (iterator.hasNext()){
KeyValue<Key,Value> nextValue = iterator.next();
if isDataOld(nextValue)
kvStore.delete(nextValue.key);
}
});
}
怎么样:
final var store = Stores.persistentSessionStore(MATERIAL_STORE_NAME, Duration.ofDays(10));
return Stores.sessionStoreBuilder(store,
Serdes.Long(),
keySpecificAvroSerde
);