处理来自发布者的并行耗尽的数据
Processing pararelly drainded data from publisher
如何处理从Publisher
并行订阅的数据?
- 我应该在工作人员池中循环订阅吗?当我在
Project Reactor
中调用订阅时,我只得到了一大块数据。如何"drain all"?
- 如何保证每个worker取不同的数据块?
将每个数据块转换为一个任务并将其提交给Executor
。转换器可能如下所示:
class Converter implements Subscriber<T> {
final Executor executor;
Subscription subscription;
Converter(Executor executor) {
this.executor = executor;
}
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(1);
}
@Override
public void onNext(T data) {
executor.execute(()->process(data));
subscription.request(1);
}
...
void process(T o) {
...
}
}
在 Project Reactor 中,使用 parallel()
运算符:
Flux.from(thePublisher) //if we don't assume publisher is already a Flux
.parallel() //instruct the Flux to divide work on "rails",
//but so far these rails are running on the same thread !
.runOn(Schedulers.parallel()) //now each rail runs on its own thread
.map(...).etc(...)
.sequential() //merge the rails back to a single sequence
//subscribe, or continue processing sequentially
RxJava 2 非常相似并且具有相同的 parallel
运算符。
如何处理从Publisher
并行订阅的数据?
- 我应该在工作人员池中循环订阅吗?当我在
Project Reactor
中调用订阅时,我只得到了一大块数据。如何"drain all"? - 如何保证每个worker取不同的数据块?
将每个数据块转换为一个任务并将其提交给Executor
。转换器可能如下所示:
class Converter implements Subscriber<T> {
final Executor executor;
Subscription subscription;
Converter(Executor executor) {
this.executor = executor;
}
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(1);
}
@Override
public void onNext(T data) {
executor.execute(()->process(data));
subscription.request(1);
}
...
void process(T o) {
...
}
}
在 Project Reactor 中,使用 parallel()
运算符:
Flux.from(thePublisher) //if we don't assume publisher is already a Flux
.parallel() //instruct the Flux to divide work on "rails",
//but so far these rails are running on the same thread !
.runOn(Schedulers.parallel()) //now each rail runs on its own thread
.map(...).etc(...)
.sequential() //merge the rails back to a single sequence
//subscribe, or continue processing sequentially
RxJava 2 非常相似并且具有相同的 parallel
运算符。