Reactive Streams:如何按键等待所有发布者?

Reactive Streams: How to wait for all publishers, by key?

假设我有 3 个发布者和 1 个处理者。发布者以 {key: <integer>, value: <object>, publisher_id: <string>}.

的形式发布项目

发布者进行IO操作,所以:

我实际上已经实现了一个 FluxProcessor,它有一个内部存储空间 (ConcurrentHashMap) 来保存所有项目。每当未达到 CAPACITY 时,它都会手动 request() 个新项目。

我想知道是否有使用 RxJava(2)/ Spring Reactor API?

的内置功能

在 RxJava 2 中使用 merge、rebatchRequests 和 toMultimap:

Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...

Flowable.merge(
    source1.rebatchRequests(N),
    source2.rebatchRequests(N),
    source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));