KTable 状态存储持久化
KTable state store persistence
如果我在具体化 KTable 时使用持久存储,状态存储会在应用程序重新启动时保持不变吗?例如,如果我使用以下内容:
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("queryable-store-name");
KTable<Long,String> table = builder.table(
"foo",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
状态存储 "queryable-store-name" 是否可以在重新启动时使用之前运行的状态访问?比方说,我向主题 foo 发送了 50 条记录,它在状态存储中得到了具体化。然后应用程序重新启动,状态存储中还会有那 50 条记录吗?如果没有,有没有办法实现这一目标?
谢谢!
是的,状态存储默认保存在磁盘上。当应用程序重新启动且 application-id
未更改时,状态将从磁盘恢复,包含所有 50 条记录。当应用程序为 killed/stopped/restarted.
时,将从偏移量添加新记录
编辑:
似乎您缺少 KTable 之上的聚合操作,这是必需的。请参阅我的代码示例:
final KStream<CustomerKey, ViewPage> viewPagesStream=builder.stream(INPUT_TOPIC);
final KTable<Windowed<ViewPageCountKey>,Long>uniqueViewPageCount=viewPagesStream
.map((key,value)->{
ViewPageCountKey newKey=new ViewPageCountKey(key.getProjectId(),value.getUrl());
return new KeyValue<>(newKey,value);
})
.filter((key,value)->key!=null)
.groupByKey()
.count(TimeWindows.of(WINDOW_SIZE).advanceBy(WINDOW_ADVANCE),STORE_NAME);
如果我在具体化 KTable 时使用持久存储,状态存储会在应用程序重新启动时保持不变吗?例如,如果我使用以下内容:
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("queryable-store-name");
KTable<Long,String> table = builder.table(
"foo",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
状态存储 "queryable-store-name" 是否可以在重新启动时使用之前运行的状态访问?比方说,我向主题 foo 发送了 50 条记录,它在状态存储中得到了具体化。然后应用程序重新启动,状态存储中还会有那 50 条记录吗?如果没有,有没有办法实现这一目标?
谢谢!
是的,状态存储默认保存在磁盘上。当应用程序重新启动且 application-id
未更改时,状态将从磁盘恢复,包含所有 50 条记录。当应用程序为 killed/stopped/restarted.
编辑: 似乎您缺少 KTable 之上的聚合操作,这是必需的。请参阅我的代码示例:
final KStream<CustomerKey, ViewPage> viewPagesStream=builder.stream(INPUT_TOPIC);
final KTable<Windowed<ViewPageCountKey>,Long>uniqueViewPageCount=viewPagesStream
.map((key,value)->{
ViewPageCountKey newKey=new ViewPageCountKey(key.getProjectId(),value.getUrl());
return new KeyValue<>(newKey,value);
})
.filter((key,value)->key!=null)
.groupByKey()
.count(TimeWindows.of(WINDOW_SIZE).advanceBy(WINDOW_ADVANCE),STORE_NAME);