如何在 spring 中创建反应器热流?

How to create a reactor hot stream in spring?

我有以下代码。我以为通过发布我可以创建一个热流,但是每个连接上的 uniqueId 都不同。我希望创建方法执行一次,然后运行作为任意数量订阅者的热流。

    private Date lastChecked = Calendar.getInstance().getTime();

    @GetMapping(path = "/news/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Object> comments(@RequestParam(required=false) String date) throws ParseException {
        if(date != null) {
            lastChecked=new SimpleDateFormat("dd-MM-yyyy").parse(date);
        }
        return Flux.create(fluxSink -> {
            String uniqueID = UUID.randomUUID().toString();
            Flux.interval(Duration.ofSeconds(1))
                .map(s -> {
                    System.out.println(uniqueID);
                    this.newsRepository.findByTimestampGreaterThanOrderByTimestampDesc(lastChecked).take(5).doOnEach(new Consumer<Signal<News>>() {
                        @Override
                        public void accept(Signal<News> commentSignal) {
                            fluxSink.next(commentSignal.get());
                        }
                    }).subscribe();
                    lastChecked = Calendar.getInstance().getTime();
                    lastChecked.setTime(lastChecked.getTime() - 2000);
                    return s;
                })
                .subscribe();
        })
        .publishOn(Schedulers.parallel())
        .onBackpressureBuffer()
        .publish() //executing publish here 
        .autoConnect();
    }

我找到了解决方案,使用 PostConstruct 注释只创建一次 ConnectableFlux:

    private Date lastChecked = Calendar.getInstance().getTime();
    private ConnectableFlux<Object> flux;

    @PostConstruct
    public void load()
    {
        System.out.println("Loaded");
        this.flux = Flux.create(fluxSink -> {
            String uniqueID = UUID.randomUUID().toString();
            Flux.interval(Duration.ofSeconds(1))
                .map(s -> {
                    System.out.println(uniqueID);
                    this.newsRepository.findByTimestampGreaterThanOrderByTimestampDesc(lastChecked).take(5).doOnEach(new Consumer<Signal<News>>() {
                        @Override
                        public void accept(Signal<News> commentSignal) {
                            fluxSink.next(commentSignal.get());
                        }
                    }).subscribe();
                    lastChecked = Calendar.getInstance().getTime();
                    lastChecked.setTime(lastChecked.getTime() - 2000);
                    return s;
                })
                .subscribe();
        }).publishOn(Schedulers.parallel())
            .onBackpressureBuffer()
            .publish();
    }

    @GetMapping(path = "/news/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Object> comments(@RequestParam(required=false) String date) throws ParseException {

        return flux.autoConnect();
    }