反应性存储库

Reactive repository

我尝试创建一些基于反应堆(Reactor + WebFlux)的基本 Spring 5 应用程序。

我的下一个目标是实现反应式存储库,它能够:

  1. 保存图书。
  2. 查找所有书籍。

我的存储库需要涵盖以下场景:

场景 A:

  1. 没有人订阅 FindAll
  2. 有人保存了一本书 (id = 1)
  3. Client1 订阅 FindAll
  4. 图书 (id=1) 被推送到 Client1(Client1 保持订阅状态,流未完成!)
  5. 有人保存了一本书 (id = 2)
  6. 图书 (id=2) 被推送到 Client1(Client1 保持订阅状态,流未完成!)

所以,IMO 这个场景是冷源和热源概念的混合体。在任何人订阅之前,我们会收集某人保存在我们存储库中某个缓冲区中的数据(假设是普通列表)。对于将订阅 FindAll 的所有订阅者,我们需要推送缓冲列表(在他的订阅之前收集)并且不要完成流以允许推送以后的集合更新。

我能够做到这一点,但我仍然在想是否有更简单的方法来做到这一点?也许 Reactor 项目中已经有一个解决方案已经涵盖了这种情况?

我的实现:

public class InMemoryBookRepository {

private final Map<String, Book> bookMap = new ConcurrentHashMap<>();
private final UnicastProcessor<Book> processor = UnicastProcessor.create();
private final FluxSink<Book> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<Book> hotFlux = processor.publish().autoConnect();

@Override
public void save(Book book) {
    bookMap.put(book.getId(), book);
    fluxSink.next(book);
}

@Override
public Flux<Book> findAll() {
    //without fromIterable I cannot push books that where saved before someone subscribed
    return Flux.fromIterable(bookMap.values())
            .concatWith(hotFlux)
            //Unfortunately this solution produces duplicates so we need to filter them
            .distinct();
}
}

Ofc,我不能只使用 Cold publisher- 因为流将在发布收集的书籍后完成。出于同样的原因,我不能使用 Hot one,因为我会错过在某人订阅之前生成的元素。

旁注:在我的代码中,我的地图没有任何清理机制,因此它会在某些时候产生异常,但现在这并不重要。

这么简单。。。不知道为什么错过了这个漂亮的算子:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#replay--

所以基本上删除整个 List/Map 部分代码并使用 replay() 而不是 publish()

简化示例:

UnicastProcessor<String> processor = UnicastProcessor.create();
FluxSink<String> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
//change 'publish()' to 'replay()'
Flux<String> hotFlux = processor.publish().autoConnect(); 

hotFlux.subscribe(n -> log.info("1st subscriber: {}", n));

fluxSink.next("one");

hotFlux.subscribe(n -> log.info("2nd subscriber: {}", n));

fluxSink.next("two");

输出 publish():

1st subscriber: one
1st subscriber: two
2nd subscriber: two

输出 replay():

1st subscriber: one
2nd subscriber: one
1st subscriber: two
2nd subscriber: two