我可以清空本地的 kafka state store
Can I empty the local kafka state store
目前我有 3 个具有 150 个分区的 kafka 代理。
我还有 3 个消费者,每个消费者都分配给一组分区。
每个消费者都有自己的本地状态存储和 rocksdb。这个内存中的键值存储在 grpc 调用期间被调用。在重新平衡期间(如果消费者消失),则数据将写入其他消费者的本地存储中。
如果消费者 运行 大约 2 周,则服务似乎 运行 内存不足。
有没有解决本地存储增长过多的问题?我们可以删除不再需要的分区数据吗?或者有没有办法在消费者恢复后删除存储的数据?
你可以使用 cleanUp();启动或关闭 Kafka Stream 以清理状态存储时的方法。
cleanUp()
Do a clean up of the local StateStore by deleting all data with regard
to the application ID. May only be called either before this
KafkaStreams instance is started in with calling start() method or
after the instance is closed by calling close() method.
KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions. See tip on `cleanUp()` below.
app.cleanUp();
app.start();
Note: To avoid the corresponding recovery overhead, you should not call
cleanUp() by default but only if you really need to. Otherwise, you wipe out local state and trigger an expensive state restoration. You
won't lose data and the program will still be correct, but you may
slow down startup significantly (depending on the size of your state)
如果您希望在 Kafka Stream 的生命周期中从状态存储中删除,您可以很好地从状态存储中删除,毕竟它只是 rocks B 中的地图存储集合
假设您使用的是 Kafka 流处理器
KeyValueStore<String, String> dsStore=(KeyValueStore<String, String>) context.getStateStore("localstorename");
KeyValueIterator<String, String> iter = this.dsStore.all();
while (iter.hasNext()) {
KeyValue<String, String> entry = iter.next();
dsStore.delete(entry.key);
}
目前我有 3 个具有 150 个分区的 kafka 代理。 我还有 3 个消费者,每个消费者都分配给一组分区。 每个消费者都有自己的本地状态存储和 rocksdb。这个内存中的键值存储在 grpc 调用期间被调用。在重新平衡期间(如果消费者消失),则数据将写入其他消费者的本地存储中。
如果消费者 运行 大约 2 周,则服务似乎 运行 内存不足。 有没有解决本地存储增长过多的问题?我们可以删除不再需要的分区数据吗?或者有没有办法在消费者恢复后删除存储的数据?
你可以使用 cleanUp();启动或关闭 Kafka Stream 以清理状态存储时的方法。
cleanUp()
Do a clean up of the local StateStore by deleting all data with regard to the application ID. May only be called either before this KafkaStreams instance is started in with calling start() method or after the instance is closed by calling close() method.
KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions. See tip on `cleanUp()` below.
app.cleanUp();
app.start();
Note: To avoid the corresponding recovery overhead, you should not call cleanUp() by default but only if you really need to. Otherwise, you wipe out local state and trigger an expensive state restoration. You won't lose data and the program will still be correct, but you may slow down startup significantly (depending on the size of your state)
如果您希望在 Kafka Stream 的生命周期中从状态存储中删除,您可以很好地从状态存储中删除,毕竟它只是 rocks B 中的地图存储集合
假设您使用的是 Kafka 流处理器
KeyValueStore<String, String> dsStore=(KeyValueStore<String, String>) context.getStateStore("localstorename");
KeyValueIterator<String, String> iter = this.dsStore.all();
while (iter.hasNext()) {
KeyValue<String, String> entry = iter.next();
dsStore.delete(entry.key);
}