Reactor / WebFlux 实现了一个响应式的 http 新闻自动收报机
Reactor / WebFlux implement a reactive http news ticker
我有一个很简单的请求,但我无法在不泄漏资源的情况下将其提取出来。
我想要 return 类型 application/stream+json
的回复,其中包含某人发布的新闻事件。我不想使用 Websockets,不是因为我不喜欢它们,我只是想知道如何使用流。
为此,我需要从我的 restcontroller return Flux<News>
,一旦有人发布任何消息,它就会不断收到新闻。
我为此尝试创建一个发布者:
public class UpdatePublisher<T> implements Publisher<T> {
private List<Subscriber<? super T>> subscribers = new ArrayList<>();
@Override
public void subscribe(Subscriber<? super T> s) {
subscribers.add(s);
}
public void pushUpdate(T message) {
subscribers.forEach(s -> s.onNext(message));
}
}
还有一个简单的新闻对象:
public class News {
String message;
// Constructor, getters, some properties omitted for readability...
}
和发布消息的端点分别获取消息流
// ...
private UpdatePublisher<String> updatePublisher = new UpdatePublisher<>();
@GetMapping(value = "/news/ticker", produces = "application/stream+json")
public Flux<News> getUpdateStream() {
return Flux.from(updatePublisher).map(News::new);
}
@PutMapping("/news")
public void putNews(@RequestBody News news) {
updatePublisher.pushUpdate(news.getMessage());
}
这有效,但我无法取消订阅或再次访问任何给定的订阅 - 所以一旦客户端断开连接,updatePublisher
将继续推送到越来越多的死频道 - 因为我没有办法在订阅上调用 onCompleted()
处理程序。
TL;DL:
是否可以从另一个线程将消息推送到可能无限的 Flux 上,并且仍然按需终止 Flux,而不依赖于由对等异常重置或类似的东西?
你应该永远不要尝试自己实现Publisher
接口,因为它归结为正确实现反应流。这正是您在这里面临的问题。
相反,您应该使用 Reactor 本身提供的生成器运算符之一(这实际上是一个 Reactor 问题,与 Spring WebFlux 无关)。
在这种情况下,Flux.create
或 Flux.push
可能是最佳候选者,因为您的代码使用某种类型的事件侦听器将事件向下推送。 See the reactor project reference documentation on that.
如果没有更多的细节,很难给你一个具体的代码示例来解决你的问题。不过这里有一些提示:
- 如果您想要某种multicast-like通信模式
,您可能想要.share()
所有订阅者的事件流
- 请注意您想要的 push/pull/push+pull 模型;背压应该如何在这里工作?如果我们生成更多订阅者可以处理的事件怎么办?
- 此模型仅适用于单个应用程序实例。如果您希望它适用于多个应用程序实例,您可能需要使用代理
查看消息传递模式
我有一个很简单的请求,但我无法在不泄漏资源的情况下将其提取出来。
我想要 return 类型 application/stream+json
的回复,其中包含某人发布的新闻事件。我不想使用 Websockets,不是因为我不喜欢它们,我只是想知道如何使用流。
为此,我需要从我的 restcontroller return Flux<News>
,一旦有人发布任何消息,它就会不断收到新闻。
我为此尝试创建一个发布者:
public class UpdatePublisher<T> implements Publisher<T> {
private List<Subscriber<? super T>> subscribers = new ArrayList<>();
@Override
public void subscribe(Subscriber<? super T> s) {
subscribers.add(s);
}
public void pushUpdate(T message) {
subscribers.forEach(s -> s.onNext(message));
}
}
还有一个简单的新闻对象:
public class News {
String message;
// Constructor, getters, some properties omitted for readability...
}
和发布消息的端点分别获取消息流
// ...
private UpdatePublisher<String> updatePublisher = new UpdatePublisher<>();
@GetMapping(value = "/news/ticker", produces = "application/stream+json")
public Flux<News> getUpdateStream() {
return Flux.from(updatePublisher).map(News::new);
}
@PutMapping("/news")
public void putNews(@RequestBody News news) {
updatePublisher.pushUpdate(news.getMessage());
}
这有效,但我无法取消订阅或再次访问任何给定的订阅 - 所以一旦客户端断开连接,updatePublisher
将继续推送到越来越多的死频道 - 因为我没有办法在订阅上调用 onCompleted()
处理程序。
TL;DL:
是否可以从另一个线程将消息推送到可能无限的 Flux 上,并且仍然按需终止 Flux,而不依赖于由对等异常重置或类似的东西?
你应该永远不要尝试自己实现Publisher
接口,因为它归结为正确实现反应流。这正是您在这里面临的问题。
相反,您应该使用 Reactor 本身提供的生成器运算符之一(这实际上是一个 Reactor 问题,与 Spring WebFlux 无关)。
在这种情况下,Flux.create
或 Flux.push
可能是最佳候选者,因为您的代码使用某种类型的事件侦听器将事件向下推送。 See the reactor project reference documentation on that.
如果没有更多的细节,很难给你一个具体的代码示例来解决你的问题。不过这里有一些提示:
- 如果您想要某种multicast-like通信模式 ,您可能想要
- 请注意您想要的 push/pull/push+pull 模型;背压应该如何在这里工作?如果我们生成更多订阅者可以处理的事件怎么办?
- 此模型仅适用于单个应用程序实例。如果您希望它适用于多个应用程序实例,您可能需要使用代理 查看消息传递模式
.share()
所有订阅者的事件流