如何从 KafkaStreams PersistentKeyValueStore 中删除记录?
How Can I remove a record from KafkaStreams PersistentKeyValueStore?
我将一些记录放入 PersistentKeyValueStore:
Long nowValue = Optional.ofNullable(myStore.get(key)).orElse(0L) + newValue;
myStore.put(key, nowValue);
运行良好。我可以通过println看到记录:
KeyValueIterator<String, Long> iter = myStore.all();
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
System.out.println(entry.value)
}
iter.close();
但是当我尝试删除一条记录时:
KeyValueIterator<String, Long> iter = myStore.all();
while (iter.hasNext()) {
iter.remove();
}
iter.close();
出现异常:
java.lang.UnsupportedOperationException: RocksDB iterator does not support remove()
at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.remove(RocksDBStore.java:515)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore$MeteredKeyValueIterator.remove(InnerMeteredKeyValueStore.java:306)
at com.loogtech.bi.compute.realtime.processor.AddCashProcessor$TotalCashAddPunctuator.punctuate(AddCashProcessor.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:131)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:263)
at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)
at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:619)
at org.apache.kafka.streams.processor.internals.AssignedTasks.punctuate(AssignedTasks.java:430)
at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:324)
at org.apache.kafka.streams.processor.internals.StreamThread.punctuate(StreamThread.java:969)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:834)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
似乎也无法通过 myStore.remove(key)
删除记录
如何按键删除记录?
接口 KeyValueStore
提供 #delete(...)
方法(不是 #remove(...)
)从存储中删除记录。
我将一些记录放入 PersistentKeyValueStore:
Long nowValue = Optional.ofNullable(myStore.get(key)).orElse(0L) + newValue;
myStore.put(key, nowValue);
运行良好。我可以通过println看到记录:
KeyValueIterator<String, Long> iter = myStore.all();
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
System.out.println(entry.value)
}
iter.close();
但是当我尝试删除一条记录时:
KeyValueIterator<String, Long> iter = myStore.all();
while (iter.hasNext()) {
iter.remove();
}
iter.close();
出现异常:
java.lang.UnsupportedOperationException: RocksDB iterator does not support remove()
at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.remove(RocksDBStore.java:515)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore$MeteredKeyValueIterator.remove(InnerMeteredKeyValueStore.java:306)
at com.loogtech.bi.compute.realtime.processor.AddCashProcessor$TotalCashAddPunctuator.punctuate(AddCashProcessor.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:131)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:263)
at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)
at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:619)
at org.apache.kafka.streams.processor.internals.AssignedTasks.punctuate(AssignedTasks.java:430)
at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:324)
at org.apache.kafka.streams.processor.internals.StreamThread.punctuate(StreamThread.java:969)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:834)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
似乎也无法通过 myStore.remove(key)
如何按键删除记录?
接口 KeyValueStore
提供 #delete(...)
方法(不是 #remove(...)
)从存储中删除记录。