Kafka 流处理器 API 清除状态存储

Kafka Streams Processor API clear state store

我正在使用 kafka 处理器 API 进行一些自定义计算。由于一些复杂的处理,DSL 并不是最合适的。流代码如下所示。

KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("storeName");
StoreBuilder<KeyValueStore<String, StoreObject>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier,
            Serdes.String(), storeObjectSerde);   
topology.addSource("SourceReadername", stringDeserializer, sourceSerde.deserializer(), "sourceTopic")
.addProcessor("processor", () -> new CustomProcessor("store"), FillReadername)
.addStateStore(storeBuilder, "processor") // define store for processor
.addSink("sinkName", "outputTopic", stringSerializer, resultSerde.serializer(),
                    Fill_PROCESSOR);

我需要根据来自单独主题的事件从状态存储中清除一些项目。我无法找到正确的方式来使用处理器 API 或其他方式来监听另一个主题中的事件以触发 CustomProcessor class 中的清理代码。 有没有办法在处理器 API 中获取另一个主题中的事件?或者可能将 DSL 与 Processor API 混合使用,以便能够将两者结合起来并将任何主题中的事件发送到 Process 方法,这样我就可以 运行 在清理中收到事件时的清理代码主题?

谢谢

您只需要添加另一个输入主题 (add:Source) 并添加从该主题转换消息的处理器,并根据它们从状态存储中删除员工。请注意,这些主题应该使用相同的键(因为分区)。