如何创建由完成通量触发的单声道

How to create a Mono triggered by completion of a flux

我有一个 class 从 Flux 源填充地图。我希望能够 updateTheData() 即时,但我希望 getTheData() 能够 return 解析为当前或未决数据的 Mono。

基本上如果 theData != null 并且没有正在进行的通量,return Mono.just(theData) 否则 return mono 最终会发出数据。

编辑:这是迄今为止我能做的最好的

    class SomeClass {
        private Mono<Map<String, Data>> theData;
        private final SomeFluxService someFluxService = new SomeFluxService();

        public SomeClass() {
            updateTheData();
        }

        public void updateTheData() {
            someFluxService.get()
                .collectMap(Data::getId, Function.identity())
                .subscribe(d -> this.theData = Mono.just(d));
        }

        public Mono<Map<String, Data>> getTheData() {
            return this.theData;
        }
    }

但是还有问题updateTheData()第一次完成之前,getTheData()会return null

感谢帮助!

But there is still the problem of before updateTheData() completes for the first time, getTheData() will return null

这是因为您在订阅方法中使用消费者仅在发出数据时更新 Mono。这是一个有点奇怪的方法,最好简单地 cache()Mono 然后立即将其分配给您的字段:

public void updateTheData() {
    theData = someFluxService.get()
            .collectMap(Data::getId, Function.identity())
            .cache();
}

进一步思考,如果您的要求是每 x 秒更新一次数据,而不是固有地按需更新,您可以将其传递到缓存函数中并完全取消单独的方法:

public NewClass() {
    theData = someFluxService.get()
            .collectMap(Data::getId, Function.identity())
            .cache(Duration.ofMinutes(5));
}

public Mono<Map<String, Data>> getTheData() {
    return theData;
}