如何创建由完成通量触发的单声道
How to create a Mono triggered by completion of a flux
我有一个 class 从 Flux 源填充地图。我希望能够 updateTheData()
即时,但我希望 getTheData()
能够 return 解析为当前或未决数据的 Mono。
基本上如果 theData != null 并且没有正在进行的通量,return Mono.just(theData)
否则 return mono 最终会发出数据。
编辑:这是迄今为止我能做的最好的
class SomeClass {
private Mono<Map<String, Data>> theData;
private final SomeFluxService someFluxService = new SomeFluxService();
public SomeClass() {
updateTheData();
}
public void updateTheData() {
someFluxService.get()
.collectMap(Data::getId, Function.identity())
.subscribe(d -> this.theData = Mono.just(d));
}
public Mono<Map<String, Data>> getTheData() {
return this.theData;
}
}
但是还有问题updateTheData()
第一次完成之前,getTheData()
会return null
感谢帮助!
But there is still the problem of before updateTheData()
completes for the first time, getTheData()
will return null
这是因为您在订阅方法中使用消费者仅在发出数据时更新 Mono
。这是一个有点奇怪的方法,最好简单地 cache()
和 Mono
然后立即将其分配给您的字段:
public void updateTheData() {
theData = someFluxService.get()
.collectMap(Data::getId, Function.identity())
.cache();
}
进一步思考,如果您的要求是每 x
秒更新一次数据,而不是固有地按需更新,您可以将其传递到缓存函数中并完全取消单独的方法:
public NewClass() {
theData = someFluxService.get()
.collectMap(Data::getId, Function.identity())
.cache(Duration.ofMinutes(5));
}
public Mono<Map<String, Data>> getTheData() {
return theData;
}
我有一个 class 从 Flux 源填充地图。我希望能够 updateTheData()
即时,但我希望 getTheData()
能够 return 解析为当前或未决数据的 Mono。
基本上如果 theData != null 并且没有正在进行的通量,return Mono.just(theData) 否则 return mono 最终会发出数据。
编辑:这是迄今为止我能做的最好的
class SomeClass {
private Mono<Map<String, Data>> theData;
private final SomeFluxService someFluxService = new SomeFluxService();
public SomeClass() {
updateTheData();
}
public void updateTheData() {
someFluxService.get()
.collectMap(Data::getId, Function.identity())
.subscribe(d -> this.theData = Mono.just(d));
}
public Mono<Map<String, Data>> getTheData() {
return this.theData;
}
}
但是还有问题updateTheData()
第一次完成之前,getTheData()
会return null
感谢帮助!
But there is still the problem of before
updateTheData()
completes for the first time,getTheData()
will return null
这是因为您在订阅方法中使用消费者仅在发出数据时更新 Mono
。这是一个有点奇怪的方法,最好简单地 cache()
和 Mono
然后立即将其分配给您的字段:
public void updateTheData() {
theData = someFluxService.get()
.collectMap(Data::getId, Function.identity())
.cache();
}
进一步思考,如果您的要求是每 x
秒更新一次数据,而不是固有地按需更新,您可以将其传递到缓存函数中并完全取消单独的方法:
public NewClass() {
theData = someFluxService.get()
.collectMap(Data::getId, Function.identity())
.cache(Duration.ofMinutes(5));
}
public Mono<Map<String, Data>> getTheData() {
return theData;
}