如何使用 RxJava 并发处理文本行
How to Concurrently Process Lines of Text with RxJava
我想知道如何使用 RxJava 同时处理文本行。现在,我所拥有的是一个来自条目集的可观察对象,以及一个在每个条目上处理 onNext 调用中的条目的订阅者。订阅者使用这行代码订阅了 observable
obs.observeOn(Schedulers.io()).subscribe(sub);
但是当我 运行 它时,它 运行 和顺序版本一样慢,而且似乎是顺序处理的。我如何使它并发?
您的 observeOn(Schedulers.io())
调用意味着将在该线程上观察所有排放。你想让他们进入他们的自己的线程。
在这里,我使用 flatMap
为从源发出的每个项目创建一个新的可观察对象。在映射函数中,我必须 defer
处理工作直到订阅,否则整个链在处理完成时被阻塞。我还必须确保通过 subscribeOn
.
在新线程上进行订阅
Random r = new Random();
Observable.from(new String[]{"First", "Second", "Third", "Fourth", "Fifth"})
.flatMap(new Func1<String, Observable<String>>() {
public Observable<String> call(final String s) {
return Observable.defer(new Func0<Observable<String>>() {
public Observable<String> call() {
Thread.sleep(r.nextInt(1000));
return Observable.just(s);
}
}).subscribeOn(Schedulers.newThread());
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Observed " + s + " on thread " + Thread.currentThread().getId());
}
});
这给了我这样的输出(注意乱序和在不同的线程上——即并行处理):
Observed Fourth on thread 17
Observed Second on thread 15
Observed Fifth on thread 18
Observed First on thread 14
Observed Third on thread 16
我想知道如何使用 RxJava 同时处理文本行。现在,我所拥有的是一个来自条目集的可观察对象,以及一个在每个条目上处理 onNext 调用中的条目的订阅者。订阅者使用这行代码订阅了 observable
obs.observeOn(Schedulers.io()).subscribe(sub);
但是当我 运行 它时,它 运行 和顺序版本一样慢,而且似乎是顺序处理的。我如何使它并发?
您的 observeOn(Schedulers.io())
调用意味着将在该线程上观察所有排放。你想让他们进入他们的自己的线程。
在这里,我使用 flatMap
为从源发出的每个项目创建一个新的可观察对象。在映射函数中,我必须 defer
处理工作直到订阅,否则整个链在处理完成时被阻塞。我还必须确保通过 subscribeOn
.
Random r = new Random();
Observable.from(new String[]{"First", "Second", "Third", "Fourth", "Fifth"})
.flatMap(new Func1<String, Observable<String>>() {
public Observable<String> call(final String s) {
return Observable.defer(new Func0<Observable<String>>() {
public Observable<String> call() {
Thread.sleep(r.nextInt(1000));
return Observable.just(s);
}
}).subscribeOn(Schedulers.newThread());
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Observed " + s + " on thread " + Thread.currentThread().getId());
}
});
这给了我这样的输出(注意乱序和在不同的线程上——即并行处理):
Observed Fourth on thread 17
Observed Second on thread 15
Observed Fifth on thread 18
Observed First on thread 14
Observed Third on thread 16