在单个 Observable 上并行化 map() 操作并乱序接收结果

Parallelize map() operation on single Observable and receive results out of order

给定一个 Observable<Input> 和一个昂贵但需要可变时间的映射函数 Function<Input, Output>,有没有办法在多个输入上并行调用映射函数,并在它们的生产顺序?

我试过使用 observeOn() 和多线程 Scheduler:

PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...

// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();

// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
    .observeOn(SchedulersFrom(exec))
    .map(mf)
    .publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();

inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input

但在测试中,mf.apply(fastInput) 直到 mf.apply(slowInput) 完成后才被调用。

如果我在 CountDownLatch 的测试中使用一些技巧来确保 mf.apply(slowInput) 直到 mf.apply(fastInput) 之后才能完成,程序就会死锁。

我应该在这里使用一些简单的运算符,还是 Observables 与 RxJava 的规则不符,我应该使用不同的技术?


ETA: 我查看了使用 ParallelFlowable(在订阅 [=24= 之前将其转换回带有 .sequential() 的普通 Flowable ],或者更确切地说 mySubscriber1/2),但随后我得到了额外的 mf.apply() 调用,每个 Subscriber 每个输入调用一个。有 ConnectableFlowable,但我没有多少运气弄清楚如何将它与 .parallel() 混合使用。

我猜observeOn运算符不支持单独并发执行。那么,使用 flatMap 怎么样?假设 mf 函数需要很多时间。

    ConnectableObservable<Output> outputs = inputs
        .flatMap(it -> Observable.just(it)
            .observeOn(SchedulersFrom(exec))
            .map(mf))
        .publish();

    ConnectableObservable<Output> outputs = inputs
        .flatMap(it -> Observable.just(it)
            .map(mf))
            .subscribeOn(SchedulersFrom(exec))
        .publish();

编辑 2019-12-30

如果你想并发 运行 个任务,但应该保持顺序,请使用 concatMapEager 运算符而不是 flatMap

    ConnectableObservable<Output> outputs = inputs
        .concatMapEager(it -> Observable.just(it) // here
            .observeOn(SchedulersFrom(exec))
            .map(mf))
        .publish();

对我来说听起来不可能,除非 Rx 有一些非常专业的操作员可以这样做。如果您使用 flatMap 进行映射,则元素将乱序到达。或者您可以使用 concatMap 但这样您将丢失您想要的并行映射。

编辑:正如另一位发帖者所提到的,concatMapEager 应该适用于此。并行订阅和有序结果。