如何在 spring 中创建反应器热流?
How to create a reactor hot stream in spring?
我有以下代码。我以为通过发布我可以创建一个热流,但是每个连接上的 uniqueId 都不同。我希望创建方法执行一次,然后运行作为任意数量订阅者的热流。
private Date lastChecked = Calendar.getInstance().getTime();
@GetMapping(path = "/news/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> comments(@RequestParam(required=false) String date) throws ParseException {
if(date != null) {
lastChecked=new SimpleDateFormat("dd-MM-yyyy").parse(date);
}
return Flux.create(fluxSink -> {
String uniqueID = UUID.randomUUID().toString();
Flux.interval(Duration.ofSeconds(1))
.map(s -> {
System.out.println(uniqueID);
this.newsRepository.findByTimestampGreaterThanOrderByTimestampDesc(lastChecked).take(5).doOnEach(new Consumer<Signal<News>>() {
@Override
public void accept(Signal<News> commentSignal) {
fluxSink.next(commentSignal.get());
}
}).subscribe();
lastChecked = Calendar.getInstance().getTime();
lastChecked.setTime(lastChecked.getTime() - 2000);
return s;
})
.subscribe();
})
.publishOn(Schedulers.parallel())
.onBackpressureBuffer()
.publish() //executing publish here
.autoConnect();
}
我找到了解决方案,使用 PostConstruct 注释只创建一次 ConnectableFlux:
private Date lastChecked = Calendar.getInstance().getTime();
private ConnectableFlux<Object> flux;
@PostConstruct
public void load()
{
System.out.println("Loaded");
this.flux = Flux.create(fluxSink -> {
String uniqueID = UUID.randomUUID().toString();
Flux.interval(Duration.ofSeconds(1))
.map(s -> {
System.out.println(uniqueID);
this.newsRepository.findByTimestampGreaterThanOrderByTimestampDesc(lastChecked).take(5).doOnEach(new Consumer<Signal<News>>() {
@Override
public void accept(Signal<News> commentSignal) {
fluxSink.next(commentSignal.get());
}
}).subscribe();
lastChecked = Calendar.getInstance().getTime();
lastChecked.setTime(lastChecked.getTime() - 2000);
return s;
})
.subscribe();
}).publishOn(Schedulers.parallel())
.onBackpressureBuffer()
.publish();
}
@GetMapping(path = "/news/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> comments(@RequestParam(required=false) String date) throws ParseException {
return flux.autoConnect();
}
我有以下代码。我以为通过发布我可以创建一个热流,但是每个连接上的 uniqueId 都不同。我希望创建方法执行一次,然后运行作为任意数量订阅者的热流。
private Date lastChecked = Calendar.getInstance().getTime();
@GetMapping(path = "/news/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> comments(@RequestParam(required=false) String date) throws ParseException {
if(date != null) {
lastChecked=new SimpleDateFormat("dd-MM-yyyy").parse(date);
}
return Flux.create(fluxSink -> {
String uniqueID = UUID.randomUUID().toString();
Flux.interval(Duration.ofSeconds(1))
.map(s -> {
System.out.println(uniqueID);
this.newsRepository.findByTimestampGreaterThanOrderByTimestampDesc(lastChecked).take(5).doOnEach(new Consumer<Signal<News>>() {
@Override
public void accept(Signal<News> commentSignal) {
fluxSink.next(commentSignal.get());
}
}).subscribe();
lastChecked = Calendar.getInstance().getTime();
lastChecked.setTime(lastChecked.getTime() - 2000);
return s;
})
.subscribe();
})
.publishOn(Schedulers.parallel())
.onBackpressureBuffer()
.publish() //executing publish here
.autoConnect();
}
我找到了解决方案,使用 PostConstruct 注释只创建一次 ConnectableFlux:
private Date lastChecked = Calendar.getInstance().getTime();
private ConnectableFlux<Object> flux;
@PostConstruct
public void load()
{
System.out.println("Loaded");
this.flux = Flux.create(fluxSink -> {
String uniqueID = UUID.randomUUID().toString();
Flux.interval(Duration.ofSeconds(1))
.map(s -> {
System.out.println(uniqueID);
this.newsRepository.findByTimestampGreaterThanOrderByTimestampDesc(lastChecked).take(5).doOnEach(new Consumer<Signal<News>>() {
@Override
public void accept(Signal<News> commentSignal) {
fluxSink.next(commentSignal.get());
}
}).subscribe();
lastChecked = Calendar.getInstance().getTime();
lastChecked.setTime(lastChecked.getTime() - 2000);
return s;
})
.subscribe();
}).publishOn(Schedulers.parallel())
.onBackpressureBuffer()
.publish();
}
@GetMapping(path = "/news/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> comments(@RequestParam(required=false) String date) throws ParseException {
return flux.autoConnect();
}