如何从 KTable 中获取排序后的 KeyValueStore?
How to get a sorted KeyValueStore from a KTable?
我想从 KStream 中具体化一个 KTable,我希望 KeyValueStore 按 Key 排序。
我尝试查找建议通过处理器 API 实现排序的 KTable API 规范 (https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KTable.html), but no 'sort'-method exists. I also looked up this article (https://dzone.com/articles/how-to-order-streamed-dataframes)。但是,我正在检查是否可以通过其他方式实现这一点?
KafkaStream 允许您物化可查询状态存储。
然后,您可以通过调用方法 kafkaStream#store()
.
获得对商店的只读访问权限
如果您定义持久化存储,KafkaStreams 将使用 RocksDB 来存储您的数据。返回的 KeyValueIterator 实例将使用 RocksDB 迭代器,它允许您以排序方式迭代键值 Rocks Iterator-Implementation.
示例:
KafkaStreams streams = new KafkaStreams(topology, props);
ReadOnlyKeyValueStore<Object, Object> store = streams.store("storeName", QueryableStoreTypes.keyValueStore());
KeyValueIterator<Object, Object> iterator = store.all();
使用密钥将事件添加到 StateStore。 StateStore 返回的 KeyValueIterator 以有序的方式导航 KeyValue。
public class SortProcessor extends AbstractProcessor<String, Event> {
private static Logger LOG = LoggerFactory.getLogger(SortProcessor.class);
private final String stateStore;
private final Long bufferIntervalInSeconds;
// Why not use a simple Java NavigableMap? Check out my answer at :
private KeyValueStore<String, Event> keyValueStore;
public SortProcessor(String stateStore, Long bufferIntervalInSeconds) {
this.stateStore = stateStore;
this.bufferIntervalInSeconds = bufferIntervalInSeconds;
}
@Override
public void init(ProcessorContext processorContext) {
super.init(processorContext);
keyValueStore = (KeyValueStore) context().getStateStore(stateStore);
context().schedule(Duration.ofSeconds(bufferIntervalInSeconds), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
}
void punctuate(long timestamp) {
LOG.info("Punctuator invoked...");
try (KeyValueIterator<String, Event> iterator = keyValueStore.all()) {
while (iterator.hasNext()) {
KeyValue<String, Event> next = iterator.next();
if (next.value == null) {
continue;
}
LOG.info("Sending {}", next.key);
context().forward(null, next.value);
keyValueStore.delete(next.key);
}
}
}
@Override
public void process(String key, Event value) {
Event event = Event.builder(value).payload(value.getPayload().toUpperCase()).build();
keyValueStore.put(event.getEventType().name() + " " + event.getId(), event);
}
public static String getName() {
return "sort-processor";
}
}
可执行代码是here。我在这里使用了一个简单的内存状态存储。如果您预计会在短时间内发生大量事件,您可以使用其他答案中已经建议的持久状态存储。
我想从 KStream 中具体化一个 KTable,我希望 KeyValueStore 按 Key 排序。
我尝试查找建议通过处理器 API 实现排序的 KTable API 规范 (https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KTable.html), but no 'sort'-method exists. I also looked up this article (https://dzone.com/articles/how-to-order-streamed-dataframes)。但是,我正在检查是否可以通过其他方式实现这一点?
KafkaStream 允许您物化可查询状态存储。
然后,您可以通过调用方法 kafkaStream#store()
.
如果您定义持久化存储,KafkaStreams 将使用 RocksDB 来存储您的数据。返回的 KeyValueIterator 实例将使用 RocksDB 迭代器,它允许您以排序方式迭代键值 Rocks Iterator-Implementation.
示例:
KafkaStreams streams = new KafkaStreams(topology, props);
ReadOnlyKeyValueStore<Object, Object> store = streams.store("storeName", QueryableStoreTypes.keyValueStore());
KeyValueIterator<Object, Object> iterator = store.all();
使用密钥将事件添加到 StateStore。 StateStore 返回的 KeyValueIterator 以有序的方式导航 KeyValue。
public class SortProcessor extends AbstractProcessor<String, Event> {
private static Logger LOG = LoggerFactory.getLogger(SortProcessor.class);
private final String stateStore;
private final Long bufferIntervalInSeconds;
// Why not use a simple Java NavigableMap? Check out my answer at :
private KeyValueStore<String, Event> keyValueStore;
public SortProcessor(String stateStore, Long bufferIntervalInSeconds) {
this.stateStore = stateStore;
this.bufferIntervalInSeconds = bufferIntervalInSeconds;
}
@Override
public void init(ProcessorContext processorContext) {
super.init(processorContext);
keyValueStore = (KeyValueStore) context().getStateStore(stateStore);
context().schedule(Duration.ofSeconds(bufferIntervalInSeconds), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
}
void punctuate(long timestamp) {
LOG.info("Punctuator invoked...");
try (KeyValueIterator<String, Event> iterator = keyValueStore.all()) {
while (iterator.hasNext()) {
KeyValue<String, Event> next = iterator.next();
if (next.value == null) {
continue;
}
LOG.info("Sending {}", next.key);
context().forward(null, next.value);
keyValueStore.delete(next.key);
}
}
}
@Override
public void process(String key, Event value) {
Event event = Event.builder(value).payload(value.getPayload().toUpperCase()).build();
keyValueStore.put(event.getEventType().name() + " " + event.getId(), event);
}
public static String getName() {
return "sort-processor";
}
}
可执行代码是here。我在这里使用了一个简单的内存状态存储。如果您预计会在短时间内发生大量事件,您可以使用其他答案中已经建议的持久状态存储。