我如何确定密钥库已在 Spring Cloud Stream API 中更新

How can i determine that a Keystore is updated in Spring Cloud Stream API

我使用来自 spring 云流 api 的聚合函数从主题创建实体化视图。 这看起来像下面这样:

public void process(KStream<Object, Object> input){
input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized)

然后我用以下方法查询我创建的 Statestore:

 ReadOnlyWindowStore<Object, Object> windowStore =
  queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());

现在我的问题是,在 process 方法处理新事件后,我如何确定此状态存储已更新?他们是我可以听的某种活动还是我可以创建一个?

您的程序是:

input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized)

其实最后aggregate() returns一个KTable对象。如果您通过 Materialized 禁用缓存,您可以通过以下方式了解 KTable 的每一次更新:

input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized) // disable caching via Materialized
  .toStream()
  .foreach(...) // react to every update to the KTable