如何使用处理器 API 访问 DSL 创建的 KTable/GlobalKTable?
How to access DSL-created KTable/GlobalKTable using Processor API?
我正在使用处理器 API (PAPI) 拓扑。
是否可以从处理器中访问使用 DSL 创建的 KTable(或 GlobalKTable)API(即使是只读的)?
即使用:
val builder = new StreamsBuilder()
val KTable = builder.table("topicname")
我得到了一个 KTable,但拓扑只允许您将 addStateStore 与 StoreBuilder 一起使用,而不是 KTable 本身。
.addStateStore(myStoreBuilder, MY_PROCESSOR_NAME)
所以我可以通过这样做来构建一个:
def keyValueStoreBuilder[K, V](storeName: String, keySerde: Serde[K], valueSerde: Serde[V]): StoreBuilder[KeyValueStore[K, V]] = {
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
keySerde,
valueSerde)
}
但是,在这种情况下如何干净利落地获得storeName
?
当您创建 KTable
时,它会自动在内部创建一个商店,并使用生成的名称。 (您可以通过 Topology#describe()
获取名称)。您还可以使用 Materialized
参数通过 table()
方法为商店指定名称。
我有点不清楚,你说的 "access a KTable within the Processor API" 是什么意思?如果你的意思是 "access the KTable store within a Processor
" 你可以使用 Topology#connectProcessorAndStateStores()
来让处理器访问商店。请注意,处理器永远不应写入 KTable 存储,因为 table()
运算符负责维护 table 的状态。如果您确实写入存储,则没有任何保证,如果发生故障,您可能会丢失数据。
我正在使用处理器 API (PAPI) 拓扑。
是否可以从处理器中访问使用 DSL 创建的 KTable(或 GlobalKTable)API(即使是只读的)?
即使用:
val builder = new StreamsBuilder()
val KTable = builder.table("topicname")
我得到了一个 KTable,但拓扑只允许您将 addStateStore 与 StoreBuilder 一起使用,而不是 KTable 本身。
.addStateStore(myStoreBuilder, MY_PROCESSOR_NAME)
所以我可以通过这样做来构建一个:
def keyValueStoreBuilder[K, V](storeName: String, keySerde: Serde[K], valueSerde: Serde[V]): StoreBuilder[KeyValueStore[K, V]] = {
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
keySerde,
valueSerde)
}
但是,在这种情况下如何干净利落地获得storeName
?
当您创建 KTable
时,它会自动在内部创建一个商店,并使用生成的名称。 (您可以通过 Topology#describe()
获取名称)。您还可以使用 Materialized
参数通过 table()
方法为商店指定名称。
我有点不清楚,你说的 "access a KTable within the Processor API" 是什么意思?如果你的意思是 "access the KTable store within a Processor
" 你可以使用 Topology#connectProcessorAndStateStores()
来让处理器访问商店。请注意,处理器永远不应写入 KTable 存储,因为 table()
运算符负责维护 table 的状态。如果您确实写入存储,则没有任何保证,如果发生故障,您可能会丢失数据。