处理来自发布者的并行耗尽的数据

Processing pararelly drainded data from publisher

如何处理从Publisher并行订阅的数据?

  1. 我应该在工作人员池中循环订阅吗?当我在 Project Reactor 中调用订阅时,我只得到了一大块数据。如何"drain all"?
  2. 如何保证每个worker取不同的数据块?

将每个数据块转换为一个任务并将其提交给Executor。转换器可能如下所示:

class Converter implements Subscriber<T> {
    final Executor executor;
    Subscription subscription;

    Converter(Executor executor) {
        this.executor = executor;
    }

    @Override
    public void onSubscribe(Subscription s) {
        subscription = s;
        s.request(1);
    }

    @Override
    public void onNext(T data) {
        executor.execute(()->process(data));
        subscription.request(1);
    }
    ...        
    void process(T o) {
        ...
    }
}

在 Project Reactor 中,使用 parallel() 运算符:

Flux.from(thePublisher) //if we don't assume publisher is already a Flux
    .parallel() //instruct the Flux to divide work on "rails",
    //but so far these rails are running on the same thread !
    .runOn(Schedulers.parallel()) //now each rail runs on its own thread
    .map(...).etc(...)
    .sequential() //merge the rails back to a single sequence
    //subscribe, or continue processing sequentially

RxJava 2 非常相似并且具有相同的 parallel 运算符。