如何从 Flux onComplete 发射
How to emit from Flux onComplete
我正在尝试实现类似于 Akka Streams statefulMapConcat 的东西...基本上我有一个类似这样的分数通量:
Score(LocalDate date, Integer score)
我想接收这些并每天发出一个聚合:
ScoreAggregate(LocalDate date, Integer scoreCount, Integer totalScore)
所以我有一个聚合器,它保留我在处理之前设置的一些内部状态,我想在那个 returns 一个 Mono 的聚合器上进行平面映射。如果日期发生变化,聚合器只会发出一个带有值的 Mono,因此您每天只能获得一个。
ScoreAggregator aggregator = ...
Flux<Score> scoreFlux = ...
scoreFlux.flatMap(aggregator::addScore)
所以我的问题是...如何在 scoreFlux
完成时发出最终元素?聚合器将有一些尚未发出的最后一天的数据,我需要发送这些数据。
回显评论作为答案只是为了不显示为未回答:
So my question is... how do I emit a final element when the scoreFlux completes?
您可以简单地使用 concatWith()
在您的原始流量完成后连接您想要的发布者。如果您只希望在原始发布者完成时对其进行 评估,请确保将其包装在 Mono.defer()
中,这将防止先发制人的执行。
我正在尝试实现类似于 Akka Streams statefulMapConcat 的东西...基本上我有一个类似这样的分数通量:
Score(LocalDate date, Integer score)
我想接收这些并每天发出一个聚合:
ScoreAggregate(LocalDate date, Integer scoreCount, Integer totalScore)
所以我有一个聚合器,它保留我在处理之前设置的一些内部状态,我想在那个 returns 一个 Mono 的聚合器上进行平面映射。如果日期发生变化,聚合器只会发出一个带有值的 Mono,因此您每天只能获得一个。
ScoreAggregator aggregator = ...
Flux<Score> scoreFlux = ...
scoreFlux.flatMap(aggregator::addScore)
所以我的问题是...如何在 scoreFlux
完成时发出最终元素?聚合器将有一些尚未发出的最后一天的数据,我需要发送这些数据。
回显评论作为答案只是为了不显示为未回答:
So my question is... how do I emit a final element when the scoreFlux completes?
您可以简单地使用 concatWith()
在您的原始流量完成后连接您想要的发布者。如果您只希望在原始发布者完成时对其进行 评估,请确保将其包装在 Mono.defer()
中,这将防止先发制人的执行。