Spring WebFlux 休息控制器只服务于前两个订阅
Spring WebFlux rest controller serves only first two subscriptions
Flux.create
方法以编程方式创建了一个通量:
Flux<Tweet> flux = Flux.<Tweet>create(emitter -> ...);
有一个休息控制器:
@RestController
public class StreamController {
...
@GetMapping("/top-words")
public Flux<TopWords> streamTopWords() {
return topWordsStream.getTopWords();
}
}
有几个 Web 客户端(在独立进程中):
Flux<TopWords> topWordsFlux = WebClient.create(".../top-words")
.method(HttpMethod.GET)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(TopWords.class)
.subscribe(System.out::println);
JavaScript 中有几个 EventSource 实例:
var eventSource = new EventSource(".../top-words");
eventSource.onmessage = function (e) {
console.log("Processing message: ", e.data);
};
只有前两个 "subscribers" 会开始接收消息(无论是 Web 客户端还是 EventSource 实例)。另一个将打开连接,获取 HTTP 200 状态,但事件流保持为空。客户端和服务器端都没有错误。
我不明白,“2 个订阅者”的限制在哪里。如果我想支持超过 2 个订阅者,我该怎么办?
该应用程序使用 Spring Boot 2.0.0.RELEASE 构建,并使用 spring-boot-starter-webflux 自动配置。默认配置不变。
我尝试调整的底层 API 存在限制(Twitter 流式传输 API)。
目标是连接到 Twitter 一次并处理各种不同订阅者的推文流。
最初我认为传递给 Flux.create
方法的发射器总是对所有订阅者使用相同的 FluxSink
。那当然没有意义。相反,FluxSink
被提供 per-subscriber,正如 javadoc 明确指出的那样。
我使用 Twitter 侦听器实现了我的用例,该侦听器允许注册(和 un-registration)多个 FluxSink
实例。这样,单个推文流可以被各种不同的订阅者订阅。
Flux<Tweet> flux = Flux.<Tweet>create(twitterListener::addSink);
我的 twitterListener
从 spring-social-twitter 项目实施 org.springframework.social.twitter.api.StreamListener
。
Flux.create
方法以编程方式创建了一个通量:
Flux<Tweet> flux = Flux.<Tweet>create(emitter -> ...);
有一个休息控制器:
@RestController
public class StreamController {
...
@GetMapping("/top-words")
public Flux<TopWords> streamTopWords() {
return topWordsStream.getTopWords();
}
}
有几个 Web 客户端(在独立进程中):
Flux<TopWords> topWordsFlux = WebClient.create(".../top-words")
.method(HttpMethod.GET)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(TopWords.class)
.subscribe(System.out::println);
JavaScript 中有几个 EventSource 实例:
var eventSource = new EventSource(".../top-words");
eventSource.onmessage = function (e) {
console.log("Processing message: ", e.data);
};
只有前两个 "subscribers" 会开始接收消息(无论是 Web 客户端还是 EventSource 实例)。另一个将打开连接,获取 HTTP 200 状态,但事件流保持为空。客户端和服务器端都没有错误。
我不明白,“2 个订阅者”的限制在哪里。如果我想支持超过 2 个订阅者,我该怎么办?
该应用程序使用 Spring Boot 2.0.0.RELEASE 构建,并使用 spring-boot-starter-webflux 自动配置。默认配置不变。
我尝试调整的底层 API 存在限制(Twitter 流式传输 API)。
目标是连接到 Twitter 一次并处理各种不同订阅者的推文流。
最初我认为传递给 Flux.create
方法的发射器总是对所有订阅者使用相同的 FluxSink
。那当然没有意义。相反,FluxSink
被提供 per-subscriber,正如 javadoc 明确指出的那样。
我使用 Twitter 侦听器实现了我的用例,该侦听器允许注册(和 un-registration)多个 FluxSink
实例。这样,单个推文流可以被各种不同的订阅者订阅。
Flux<Tweet> flux = Flux.<Tweet>create(twitterListener::addSink);
我的 twitterListener
从 spring-social-twitter 项目实施 org.springframework.social.twitter.api.StreamListener
。