如何使用处理器 API 访问 DSL 创建的 KTable/GlobalKTable?

How to access DSL-created KTable/GlobalKTable using Processor API?

我正在使用处理器 API (PAPI) 拓扑。

是否可以从处理器中访问使用 DSL 创建的 KTable(或 GlobalKTable)API(即使是只读的)?

即使用:

val builder = new StreamsBuilder()
val KTable = builder.table("topicname")

我得到了一个 KTable,但拓扑只允许您将 addStateStore 与 StoreBuilder 一起使用,而不是 KTable 本身。

.addStateStore(myStoreBuilder, MY_PROCESSOR_NAME)

所以我可以通过这样做来构建一个:

def keyValueStoreBuilder[K, V](storeName: String, keySerde: Serde[K], valueSerde: Serde[V]): StoreBuilder[KeyValueStore[K, V]] = {
Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore(storeName),
  keySerde,
  valueSerde)

}

但是,在这种情况下如何干净利落地获得storeName

当您创建 KTable 时,它会自动在内部创建一个商店,并使用生成的名称。 (您可以通过 Topology#describe() 获取名称)。您还可以使用 Materialized 参数通过 table() 方法为商店指定名称。

我有点不清楚,你说的 "access a KTable within the Processor API" 是什么意思?如果你的意思是 "access the KTable store within a Processor" 你可以使用 Topology#connectProcessorAndStateStores() 来让处理器访问商店。请注意,处理器永远不应写入 KTable 存储,因为 table() 运算符负责维护 table 的状态。如果您确实写入存储,则没有任何保证,如果发生故障,您可能会丢失数据。