Spring Webflux - 没有订阅者的初始消息

Spring Webflux - initial message without subscriber

我正在尝试使用 Webflux 制作一个 SSE Spring 应用程序。根据文档,如果没有订阅者,则消息不会发送到接收器。在我的用例中,我希望订阅者在调用订阅时收到最后一条消息。我发现 Sink 可以通过以下方式配置:

Sinks.many().replay().latest();

当我同时拥有发布者和订阅者时,下一个订阅者调用订阅时,他会收到最后发送的消息,这很棒。但是,如果我没有任何订阅者,发布者会发送消息,然后第一个订阅者进来,它会收到 none。正如上面的文档所说的那样,但我正在考虑如何解决该问题以满足我的需求。作为解决方法,我做了这样的事情:

if (shareSinks.currentSubscriberCount() == 0) {
  shareSinks.asFlux().subscribe();
}
shareSinks.tryEmitNext(shareDTO);

但是为发布者订阅它自己的订阅听起来不是一个干净的方法...

这是一个冷热出版商的问题。目前,您的发布者 (Sinks.many().replay().latest()) 是冷发布者。在没有订阅者的情况下发出的事件将消失。

您需要的是所谓的热门发布商。热门发布者缓存事件,新订阅者将接收所有以前缓存的事件。

这样做就可以了:

        final Sinks.Many<String> shareSinks = Sinks.many()
                .replay()
                .all(); // or .limit(10); to keep only the last 10 emissions

        final Flux<String> hotPublisher = shareSinks.asFlux()
                .cache(); // .cache() turns the cold flux into a 
                          // hot flux

        shareSinks.tryEmitNext("1");
        shareSinks.tryEmitNext("2");
        shareSinks.tryEmitNext("3");
        shareSinks.tryEmitNext("4");
        hotPublisher.subscribe(message -> System.out.println("received: " + message));

控制台打印输出为:

received: 1
received: 2
received: 3
received: 4

Reactor 文档也有一章是关于 hot vs. cold