Spring Webflux - 没有订阅者的初始消息
Spring Webflux - initial message without subscriber
我正在尝试使用 Webflux 制作一个 SSE Spring 应用程序。根据文档,如果没有订阅者,则消息不会发送到接收器。在我的用例中,我希望订阅者在调用订阅时收到最后一条消息。我发现 Sink 可以通过以下方式配置:
Sinks.many().replay().latest();
当我同时拥有发布者和订阅者时,下一个订阅者调用订阅时,他会收到最后发送的消息,这很棒。但是,如果我没有任何订阅者,发布者会发送消息,然后第一个订阅者进来,它会收到 none。正如上面的文档所说的那样,但我正在考虑如何解决该问题以满足我的需求。作为解决方法,我做了这样的事情:
if (shareSinks.currentSubscriberCount() == 0) {
shareSinks.asFlux().subscribe();
}
shareSinks.tryEmitNext(shareDTO);
但是为发布者订阅它自己的订阅听起来不是一个干净的方法...
这是一个冷热出版商的问题。目前,您的发布者 (Sinks.many().replay().latest()
) 是冷发布者。在没有订阅者的情况下发出的事件将消失。
您需要的是所谓的热门发布商。热门发布者缓存事件,新订阅者将接收所有以前缓存的事件。
这样做就可以了:
final Sinks.Many<String> shareSinks = Sinks.many()
.replay()
.all(); // or .limit(10); to keep only the last 10 emissions
final Flux<String> hotPublisher = shareSinks.asFlux()
.cache(); // .cache() turns the cold flux into a
// hot flux
shareSinks.tryEmitNext("1");
shareSinks.tryEmitNext("2");
shareSinks.tryEmitNext("3");
shareSinks.tryEmitNext("4");
hotPublisher.subscribe(message -> System.out.println("received: " + message));
控制台打印输出为:
received: 1
received: 2
received: 3
received: 4
Reactor 文档也有一章是关于 hot vs. cold。
我正在尝试使用 Webflux 制作一个 SSE Spring 应用程序。根据文档,如果没有订阅者,则消息不会发送到接收器。在我的用例中,我希望订阅者在调用订阅时收到最后一条消息。我发现 Sink 可以通过以下方式配置:
Sinks.many().replay().latest();
当我同时拥有发布者和订阅者时,下一个订阅者调用订阅时,他会收到最后发送的消息,这很棒。但是,如果我没有任何订阅者,发布者会发送消息,然后第一个订阅者进来,它会收到 none。正如上面的文档所说的那样,但我正在考虑如何解决该问题以满足我的需求。作为解决方法,我做了这样的事情:
if (shareSinks.currentSubscriberCount() == 0) {
shareSinks.asFlux().subscribe();
}
shareSinks.tryEmitNext(shareDTO);
但是为发布者订阅它自己的订阅听起来不是一个干净的方法...
这是一个冷热出版商的问题。目前,您的发布者 (Sinks.many().replay().latest()
) 是冷发布者。在没有订阅者的情况下发出的事件将消失。
您需要的是所谓的热门发布商。热门发布者缓存事件,新订阅者将接收所有以前缓存的事件。
这样做就可以了:
final Sinks.Many<String> shareSinks = Sinks.many()
.replay()
.all(); // or .limit(10); to keep only the last 10 emissions
final Flux<String> hotPublisher = shareSinks.asFlux()
.cache(); // .cache() turns the cold flux into a
// hot flux
shareSinks.tryEmitNext("1");
shareSinks.tryEmitNext("2");
shareSinks.tryEmitNext("3");
shareSinks.tryEmitNext("4");
hotPublisher.subscribe(message -> System.out.println("received: " + message));
控制台打印输出为:
received: 1
received: 2
received: 3
received: 4
Reactor 文档也有一章是关于 hot vs. cold。