如何使用 RxJava 并发处理文本行

How to Concurrently Process Lines of Text with RxJava

我想知道如何使用 RxJava 同时处理文本行。现在,我所拥有的是一个来自条目集的可观察对象,以及一个在每个条目上处理 onNext 调用中的条目的订阅者。订阅者使用这行代码订阅了 observable

obs.observeOn(Schedulers.io()).subscribe(sub);

但是当我 运行 它时,它 运行 和顺序版本一样慢,而且似乎是顺序处理的。我如何使它并发?

您的 observeOn(Schedulers.io()) 调用意味着将在该线程上观察所有排放。你想让他们进入他们的自己的线程。

在这里,我使用 flatMap 为从源发出的每个项目创建一个新的可观察对象。在映射函数中,我必须 defer 处理工作直到订阅,否则整个链在处理完成时被阻塞。我还必须确保通过 subscribeOn.

在新线程上进行订阅
Random r = new Random();
Observable.from(new String[]{"First", "Second", "Third", "Fourth", "Fifth"})
    .flatMap(new Func1<String, Observable<String>>() {
        public Observable<String> call(final String s) {
            return Observable.defer(new Func0<Observable<String>>() {
                public Observable<String> call() {
                    Thread.sleep(r.nextInt(1000));
                    return Observable.just(s);
                }
            }).subscribeOn(Schedulers.newThread());
        }
    })
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println("Observed " + s + " on thread " + Thread.currentThread().getId());
        }
    });

这给了我这样的输出(注意乱序和在不同的线程上——即并行处理):

Observed Fourth on thread 17
Observed Second on thread 15
Observed Fifth on thread 18
Observed First on thread 14
Observed Third on thread 16