如何随状态转换?

How to transform with state?

我正在尝试使用新版本的 DSL (v1.0) 构建 Kafka 流应用程序,但我不知道如何配置有状态流转换。关于如何实现这一点的基本但完整的示例将非常有帮助。

我没有在 source code. According to the documentation 中找到任何(有状态的)转换示例,应遵循以下策略:

 StateStoreSupplier myStore = Stores.create("myTransformState")
 .withKeys(...)
 .withValues(...)
 .persistent() // optional
 .build();

 builder.addStore(myStore);

 KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");

但是,示例中builder的类型不清楚,TopologyStreamsBuilder的none有一个方法addStore .如果我尝试 addStateStore 而不是它只接受 StoreBuilder 类型的参数,这不是 myStore 定义的类型。

正如 JavaDocs 解释的那样,Stores#create 在 1.0.0 中已弃用:

@deprecated use persistentKeyValueStore(String), persistentWindowStore(String, long, int, long, boolean), persistentSessionStore(String, long), lruMap(String, int). or inMemoryKeyValueStore(String)

因此,在您的情况下,您将通过 Stores.persistentKeyValueStore("myTransformState")

创建持久键值存储供应商

第二步,您需要通过 Stores.keyValueStoreBuilder(...) 创建一个 StoreBuilder,它将您之前创建的商店供应商作为参数。

之后,您可以将 StoreBuilder 添加到您的构建器

StreamsBuilder#addStateStore(final StoreBuilder builder)

要将商店连接到您的变压器,您只需像以前一样提供商店名称作为附加参数。