我如何确定密钥库已在 Spring Cloud Stream API 中更新
How can i determine that a Keystore is updated in Spring Cloud Stream API
我使用来自 spring 云流 api 的聚合函数从主题创建实体化视图。
这看起来像下面这样:
public void process(KStream<Object, Object> input){
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)
然后我用以下方法查询我创建的 Statestore:
ReadOnlyWindowStore<Object, Object> windowStore =
queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());
现在我的问题是,在 process 方法处理新事件后,我如何确定此状态存储已更新?他们是我可以听的某种活动还是我可以创建一个?
您的程序是:
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)
其实最后aggregate()
returns一个KTable
对象。如果您通过 Materialized
禁用缓存,您可以通过以下方式了解 KTable
的每一次更新:
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized) // disable caching via Materialized
.toStream()
.foreach(...) // react to every update to the KTable
我使用来自 spring 云流 api 的聚合函数从主题创建实体化视图。 这看起来像下面这样:
public void process(KStream<Object, Object> input){
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)
然后我用以下方法查询我创建的 Statestore:
ReadOnlyWindowStore<Object, Object> windowStore =
queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());
现在我的问题是,在 process 方法处理新事件后,我如何确定此状态存储已更新?他们是我可以听的某种活动还是我可以创建一个?
您的程序是:
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)
其实最后aggregate()
returns一个KTable
对象。如果您通过 Materialized
禁用缓存,您可以通过以下方式了解 KTable
的每一次更新:
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized) // disable caching via Materialized
.toStream()
.foreach(...) // react to every update to the KTable