如何从 Flux onComplete 发射

How to emit from Flux onComplete

我正在尝试实现类似于 Akka Streams statefulMapConcat 的东西...基本上我有一个类似这样的分数通量:

Score(LocalDate date, Integer score)

我想接收这些并每天发出一个聚合:

ScoreAggregate(LocalDate date, Integer scoreCount, Integer totalScore)

所以我有一个聚合器,它保留我在处理之前设置的一些内部状态,我想在那个 returns 一个 Mono 的聚合器上进行平面映射。如果日期发生变化,聚合器只会发出一个带有值的 Mono,因此您每天只能获得一个。

ScoreAggregator aggregator = ...

Flux<Score> scoreFlux = ...

scoreFlux.flatMap(aggregator::addScore)

所以我的问题是...如何在 scoreFlux 完成时发出最终元素?聚合器将有一些尚未发出的最后一天的数据,我需要发送这些数据。

回显评论作为答案只是为了不显示为未回答:

So my question is... how do I emit a final element when the scoreFlux completes?

您可以简单地使用 concatWith() 在您的原始流量完成后连接您想要的发布者。如果您只希望在原始发布者完成时对其进行 评估,请确保将其包装在 Mono.defer() 中,这将防止先发制人的执行。