在单个 Observable 上并行化 map() 操作并乱序接收结果
Parallelize map() operation on single Observable and receive results out of order
给定一个 Observable<Input>
和一个昂贵但需要可变时间的映射函数 Function<Input, Output>
,有没有办法在多个输入上并行调用映射函数,并在它们的生产顺序?
我试过使用 observeOn()
和多线程 Scheduler
:
PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...
// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();
// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
.observeOn(SchedulersFrom(exec))
.map(mf)
.publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();
inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input
但在测试中,mf.apply(fastInput)
直到 mf.apply(slowInput)
完成后才被调用。
如果我在 CountDownLatch
的测试中使用一些技巧来确保 mf.apply(slowInput)
直到 mf.apply(fastInput)
之后才能完成,程序就会死锁。
我应该在这里使用一些简单的运算符,还是 Observables
与 RxJava 的规则不符,我应该使用不同的技术?
ETA: 我查看了使用 ParallelFlowable
(在订阅 [=24= 之前将其转换回带有 .sequential()
的普通 Flowable
],或者更确切地说 mySubscriber1/2
),但随后我得到了额外的 mf.apply()
调用,每个 Subscriber
每个输入调用一个。有 ConnectableFlowable
,但我没有多少运气弄清楚如何将它与 .parallel()
混合使用。
我猜observeOn
运算符不支持单独并发执行。那么,使用 flatMap 怎么样?假设 mf
函数需要很多时间。
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();
或
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.map(mf))
.subscribeOn(SchedulersFrom(exec))
.publish();
编辑 2019-12-30
如果你想并发 运行 个任务,但应该保持顺序,请使用 concatMapEager
运算符而不是 flatMap
。
ConnectableObservable<Output> outputs = inputs
.concatMapEager(it -> Observable.just(it) // here
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();
对我来说听起来不可能,除非 Rx 有一些非常专业的操作员可以这样做。如果您使用 flatMap
进行映射,则元素将乱序到达。或者您可以使用 concatMap
但这样您将丢失您想要的并行映射。
编辑:正如另一位发帖者所提到的,concatMapEager 应该适用于此。并行订阅和有序结果。
给定一个 Observable<Input>
和一个昂贵但需要可变时间的映射函数 Function<Input, Output>
,有没有办法在多个输入上并行调用映射函数,并在它们的生产顺序?
我试过使用 observeOn()
和多线程 Scheduler
:
PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...
// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();
// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
.observeOn(SchedulersFrom(exec))
.map(mf)
.publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();
inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input
但在测试中,mf.apply(fastInput)
直到 mf.apply(slowInput)
完成后才被调用。
如果我在 CountDownLatch
的测试中使用一些技巧来确保 mf.apply(slowInput)
直到 mf.apply(fastInput)
之后才能完成,程序就会死锁。
我应该在这里使用一些简单的运算符,还是 Observables
与 RxJava 的规则不符,我应该使用不同的技术?
ETA: 我查看了使用 ParallelFlowable
(在订阅 [=24= 之前将其转换回带有 .sequential()
的普通 Flowable
],或者更确切地说 mySubscriber1/2
),但随后我得到了额外的 mf.apply()
调用,每个 Subscriber
每个输入调用一个。有 ConnectableFlowable
,但我没有多少运气弄清楚如何将它与 .parallel()
混合使用。
我猜observeOn
运算符不支持单独并发执行。那么,使用 flatMap 怎么样?假设 mf
函数需要很多时间。
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();
或
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.map(mf))
.subscribeOn(SchedulersFrom(exec))
.publish();
编辑 2019-12-30
如果你想并发 运行 个任务,但应该保持顺序,请使用 concatMapEager
运算符而不是 flatMap
。
ConnectableObservable<Output> outputs = inputs
.concatMapEager(it -> Observable.just(it) // here
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();
对我来说听起来不可能,除非 Rx 有一些非常专业的操作员可以这样做。如果您使用 flatMap
进行映射,则元素将乱序到达。或者您可以使用 concatMap
但这样您将丢失您想要的并行映射。
编辑:正如另一位发帖者所提到的,concatMapEager 应该适用于此。并行订阅和有序结果。