如何多次消耗无限通量
How to consume infinite flux multiple times
这就是我要实现的目标:
当有人请求 http://localhost/runIt
时,我想 return 从每 6 秒刷新一次的缓存中获取数据。下面,我有一个通量(始终与存储在地图中的相同)第一次实例化并开始发射数字 0,1,2,3,4... 到无穷大。
是否可以在第一次请求时Spring MVC 控制器方法 return "1,2"
,然后在 7 秒后请求 return "3,4"
等 ?
此外,如果lastRunIt
60 秒未更新,我将需要终止通量。
下面这段代码是我想到的,但目前根本不起作用。
Map<Integer, Flux<String>> itos = new ConcurrentHashMap<>();
Instant lastRunIt;
@GetMapping("runIt")
public Flux<String> runIt(){
lastRunIt = Instant.now();
return itos.computeIfAbsent(1, k ->
Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
.doOnNext(x -> {
//dispose if no request for 60 seconds
if(lastRunIt.plusSeconds(60).isBefore(Instant.now())){
//someDispisable.dispose(); //<--- HOW TO GET Disposable here?
}
System.out.println(x);
})
.cache(Duration.ofSeconds(6))
);
}
好的,我设法做了一些看起来有效的事情。想知道它是否可以这样使用,或者这里是否存在资源过度使用的可能性,或者更紧凑的方式来表示它?
Map<Integer, Flux<String>> itos = new ConcurrentHashMap<>();
Instant lastRunIt;
Subscription subskrip;
@GetMapping("runIt")
public Flux<String> runIt(){
lastRunIt = Instant.now();
return itos.computeIfAbsent(1, k -> {
return Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
.doOnSubscribe(sb -> {
subskrip = sb; //save subscription first time this is called
})
.doOnNext(x -> {
//dispose if no request for 10 seconds
if(lastRunIt.plusSeconds(10).isBefore(Instant.now())){
System.out.println("DISPOSINGGG");
subskrip.cancel(); //cancel this flux
itos.remove(1); //remove from map
}
System.out.println(x);
})
.cache(Duration.ofSeconds(9))
.take(3) //on every REST request take only 3 items that are already in cache
.map(x -> ">" + x);
});
}
这就是我要实现的目标:
当有人请求 http://localhost/runIt
时,我想 return 从每 6 秒刷新一次的缓存中获取数据。下面,我有一个通量(始终与存储在地图中的相同)第一次实例化并开始发射数字 0,1,2,3,4... 到无穷大。
是否可以在第一次请求时Spring MVC 控制器方法 return "1,2"
,然后在 7 秒后请求 return "3,4"
等 ?
此外,如果lastRunIt
60 秒未更新,我将需要终止通量。
下面这段代码是我想到的,但目前根本不起作用。
Map<Integer, Flux<String>> itos = new ConcurrentHashMap<>();
Instant lastRunIt;
@GetMapping("runIt")
public Flux<String> runIt(){
lastRunIt = Instant.now();
return itos.computeIfAbsent(1, k ->
Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
.doOnNext(x -> {
//dispose if no request for 60 seconds
if(lastRunIt.plusSeconds(60).isBefore(Instant.now())){
//someDispisable.dispose(); //<--- HOW TO GET Disposable here?
}
System.out.println(x);
})
.cache(Duration.ofSeconds(6))
);
}
好的,我设法做了一些看起来有效的事情。想知道它是否可以这样使用,或者这里是否存在资源过度使用的可能性,或者更紧凑的方式来表示它?
Map<Integer, Flux<String>> itos = new ConcurrentHashMap<>();
Instant lastRunIt;
Subscription subskrip;
@GetMapping("runIt")
public Flux<String> runIt(){
lastRunIt = Instant.now();
return itos.computeIfAbsent(1, k -> {
return Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
.doOnSubscribe(sb -> {
subskrip = sb; //save subscription first time this is called
})
.doOnNext(x -> {
//dispose if no request for 10 seconds
if(lastRunIt.plusSeconds(10).isBefore(Instant.now())){
System.out.println("DISPOSINGGG");
subskrip.cancel(); //cancel this flux
itos.remove(1); //remove from map
}
System.out.println(x);
})
.cache(Duration.ofSeconds(9))
.take(3) //on every REST request take only 3 items that are already in cache
.map(x -> ">" + x);
});
}