反应性存储库
Reactive repository
我尝试创建一些基于反应堆(Reactor + WebFlux)的基本 Spring 5 应用程序。
我的下一个目标是实现反应式存储库,它能够:
- 保存图书。
- 查找所有书籍。
我的存储库需要涵盖以下场景:
场景 A:
- 没有人订阅 FindAll
- 有人保存了一本书 (id = 1)
- Client1 订阅 FindAll
- 图书 (id=1) 被推送到 Client1(Client1 保持订阅状态,流未完成!)
- 有人保存了一本书 (id = 2)
- 图书 (id=2) 被推送到 Client1(Client1 保持订阅状态,流未完成!)
所以,IMO 这个场景是冷源和热源概念的混合体。在任何人订阅之前,我们会收集某人保存在我们存储库中某个缓冲区中的数据(假设是普通列表)。对于将订阅 FindAll 的所有订阅者,我们需要推送缓冲列表(在他的订阅之前收集)并且不要完成流以允许推送以后的集合更新。
我能够做到这一点,但我仍然在想是否有更简单的方法来做到这一点?也许 Reactor 项目中已经有一个解决方案已经涵盖了这种情况?
我的实现:
public class InMemoryBookRepository {
private final Map<String, Book> bookMap = new ConcurrentHashMap<>();
private final UnicastProcessor<Book> processor = UnicastProcessor.create();
private final FluxSink<Book> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<Book> hotFlux = processor.publish().autoConnect();
@Override
public void save(Book book) {
bookMap.put(book.getId(), book);
fluxSink.next(book);
}
@Override
public Flux<Book> findAll() {
//without fromIterable I cannot push books that where saved before someone subscribed
return Flux.fromIterable(bookMap.values())
.concatWith(hotFlux)
//Unfortunately this solution produces duplicates so we need to filter them
.distinct();
}
}
Ofc,我不能只使用 Cold publisher- 因为流将在发布收集的书籍后完成。出于同样的原因,我不能使用 Hot one,因为我会错过在某人订阅之前生成的元素。
旁注:在我的代码中,我的地图没有任何清理机制,因此它会在某些时候产生异常,但现在这并不重要。
这么简单。。。不知道为什么错过了这个漂亮的算子:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#replay--
所以基本上删除整个 List/Map
部分代码并使用 replay()
而不是 publish()
简化示例:
UnicastProcessor<String> processor = UnicastProcessor.create();
FluxSink<String> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
//change 'publish()' to 'replay()'
Flux<String> hotFlux = processor.publish().autoConnect();
hotFlux.subscribe(n -> log.info("1st subscriber: {}", n));
fluxSink.next("one");
hotFlux.subscribe(n -> log.info("2nd subscriber: {}", n));
fluxSink.next("two");
输出 publish()
:
1st subscriber: one
1st subscriber: two
2nd subscriber: two
输出 replay()
:
1st subscriber: one
2nd subscriber: one
1st subscriber: two
2nd subscriber: two
我尝试创建一些基于反应堆(Reactor + WebFlux)的基本 Spring 5 应用程序。
我的下一个目标是实现反应式存储库,它能够:
- 保存图书。
- 查找所有书籍。
我的存储库需要涵盖以下场景:
场景 A:
- 没有人订阅 FindAll
- 有人保存了一本书 (id = 1)
- Client1 订阅 FindAll
- 图书 (id=1) 被推送到 Client1(Client1 保持订阅状态,流未完成!)
- 有人保存了一本书 (id = 2)
- 图书 (id=2) 被推送到 Client1(Client1 保持订阅状态,流未完成!)
所以,IMO 这个场景是冷源和热源概念的混合体。在任何人订阅之前,我们会收集某人保存在我们存储库中某个缓冲区中的数据(假设是普通列表)。对于将订阅 FindAll 的所有订阅者,我们需要推送缓冲列表(在他的订阅之前收集)并且不要完成流以允许推送以后的集合更新。
我能够做到这一点,但我仍然在想是否有更简单的方法来做到这一点?也许 Reactor 项目中已经有一个解决方案已经涵盖了这种情况?
我的实现:
public class InMemoryBookRepository {
private final Map<String, Book> bookMap = new ConcurrentHashMap<>();
private final UnicastProcessor<Book> processor = UnicastProcessor.create();
private final FluxSink<Book> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<Book> hotFlux = processor.publish().autoConnect();
@Override
public void save(Book book) {
bookMap.put(book.getId(), book);
fluxSink.next(book);
}
@Override
public Flux<Book> findAll() {
//without fromIterable I cannot push books that where saved before someone subscribed
return Flux.fromIterable(bookMap.values())
.concatWith(hotFlux)
//Unfortunately this solution produces duplicates so we need to filter them
.distinct();
}
}
Ofc,我不能只使用 Cold publisher- 因为流将在发布收集的书籍后完成。出于同样的原因,我不能使用 Hot one,因为我会错过在某人订阅之前生成的元素。
旁注:在我的代码中,我的地图没有任何清理机制,因此它会在某些时候产生异常,但现在这并不重要。
这么简单。。。不知道为什么错过了这个漂亮的算子:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#replay--
所以基本上删除整个 List/Map
部分代码并使用 replay()
而不是 publish()
简化示例:
UnicastProcessor<String> processor = UnicastProcessor.create();
FluxSink<String> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
//change 'publish()' to 'replay()'
Flux<String> hotFlux = processor.publish().autoConnect();
hotFlux.subscribe(n -> log.info("1st subscriber: {}", n));
fluxSink.next("one");
hotFlux.subscribe(n -> log.info("2nd subscriber: {}", n));
fluxSink.next("two");
输出 publish()
:
1st subscriber: one
1st subscriber: two
2nd subscriber: two
输出 replay()
:
1st subscriber: one
2nd subscriber: one
1st subscriber: two
2nd subscriber: two