Reactive Streams:如何按键等待所有发布者?
Reactive Streams: How to wait for all publishers, by key?
假设我有 3 个发布者和 1 个处理者。发布者以 {key: <integer>, value: <object>, publisher_id: <string>}
.
的形式发布项目
发布者进行IO操作,所以:
- 一方面,我希望出版商在给定时刻处理(大约)
N
个项目。
- 另一方面,我希望消费者 将项目合并 到一条记录(即
{key: <integer>, values: <list>}
)
我实际上已经实现了一个 FluxProcessor
,它有一个内部存储空间 (ConcurrentHashMap
) 来保存所有项目。每当未达到 CAPACITY 时,它都会手动 request()
个新项目。
我想知道是否有使用 RxJava(2)/ Spring Reactor API?
的内置功能
在 RxJava 2 中使用 merge、rebatchRequests 和 toMultimap:
Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...
Flowable.merge(
source1.rebatchRequests(N),
source2.rebatchRequests(N),
source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));
假设我有 3 个发布者和 1 个处理者。发布者以 {key: <integer>, value: <object>, publisher_id: <string>}
.
发布者进行IO操作,所以:
- 一方面,我希望出版商在给定时刻处理(大约)
N
个项目。 - 另一方面,我希望消费者 将项目合并 到一条记录(即
{key: <integer>, values: <list>}
)
我实际上已经实现了一个 FluxProcessor
,它有一个内部存储空间 (ConcurrentHashMap
) 来保存所有项目。每当未达到 CAPACITY 时,它都会手动 request()
个新项目。
我想知道是否有使用 RxJava(2)/ Spring Reactor API?
的内置功能在 RxJava 2 中使用 merge、rebatchRequests 和 toMultimap:
Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...
Flowable.merge(
source1.rebatchRequests(N),
source2.rebatchRequests(N),
source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));