如何随状态转换?
How to transform with state?
我正在尝试使用新版本的 DSL (v1.0) 构建 Kafka 流应用程序,但我不知道如何配置有状态流转换。关于如何实现这一点的基本但完整的示例将非常有帮助。
我没有在 source code. According to the documentation 中找到任何(有状态的)转换示例,应遵循以下策略:
StateStoreSupplier myStore = Stores.create("myTransformState")
.withKeys(...)
.withValues(...)
.persistent() // optional
.build();
builder.addStore(myStore);
KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
但是,示例中builder
的类型不清楚,Topology
或StreamsBuilder
的none有一个方法addStore
.如果我尝试 addStateStore
而不是它只接受 StoreBuilder
类型的参数,这不是 myStore
定义的类型。
正如 JavaDocs 解释的那样,Stores#create
在 1.0.0 中已弃用:
@deprecated use persistentKeyValueStore(String), persistentWindowStore(String, long, int, long, boolean), persistentSessionStore(String, long), lruMap(String, int). or inMemoryKeyValueStore(String)
因此,在您的情况下,您将通过 Stores.persistentKeyValueStore("myTransformState")
创建持久键值存储供应商
第二步,您需要通过 Stores.keyValueStoreBuilder(...)
创建一个 StoreBuilder,它将您之前创建的商店供应商作为参数。
之后,您可以将 StoreBuilder
添加到您的构建器
StreamsBuilder#addStateStore(final StoreBuilder builder)
要将商店连接到您的变压器,您只需像以前一样提供商店名称作为附加参数。
我正在尝试使用新版本的 DSL (v1.0) 构建 Kafka 流应用程序,但我不知道如何配置有状态流转换。关于如何实现这一点的基本但完整的示例将非常有帮助。
我没有在 source code. According to the documentation 中找到任何(有状态的)转换示例,应遵循以下策略:
StateStoreSupplier myStore = Stores.create("myTransformState")
.withKeys(...)
.withValues(...)
.persistent() // optional
.build();
builder.addStore(myStore);
KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
但是,示例中builder
的类型不清楚,Topology
或StreamsBuilder
的none有一个方法addStore
.如果我尝试 addStateStore
而不是它只接受 StoreBuilder
类型的参数,这不是 myStore
定义的类型。
正如 JavaDocs 解释的那样,Stores#create
在 1.0.0 中已弃用:
@deprecated use persistentKeyValueStore(String), persistentWindowStore(String, long, int, long, boolean), persistentSessionStore(String, long), lruMap(String, int). or inMemoryKeyValueStore(String)
因此,在您的情况下,您将通过 Stores.persistentKeyValueStore("myTransformState")
第二步,您需要通过 Stores.keyValueStoreBuilder(...)
创建一个 StoreBuilder,它将您之前创建的商店供应商作为参数。
之后,您可以将 StoreBuilder
添加到您的构建器
StreamsBuilder#addStateStore(final StoreBuilder builder)
要将商店连接到您的变压器,您只需像以前一样提供商店名称作为附加参数。