rxJava 如何在多线程上制作 flatMap 运行
rxJava how to make flatMap run on multi threads
我希望从 flatMap 发送到 运行 的每个项目都在其自己的线程上
这是实际用法的简化示例,其中每个项目都是一个 url 请求。
在单个线程
上的每个仍然 运行 上添加 subscribeOn(Schedulers.io())
这里的规则是什么?
Integer[] array= new Integer[100];
for (int i = 0; i < 100; i++){
array[i] = i+1;
}
Observable.fromArray(array)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer i) throws Throwable {
Log.i(TAG, "apply " + i + " " + Thread.currentThread().getName());
return Single.just(i).subscribeOn(Schedulers.io());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer i) {
// Log.i(TAG, "onNext " + Thread.currentThread().getName() + i);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
结果:
2020-12-16 22:54:47.010 10649-10700/com.example.rxjava I/MYTAG: apply 1 RxCachedThreadScheduler-1
2020-12-16 22:54:47.037 10649-10700/com.example.rxjava I/MYTAG: apply 2 RxCachedThreadScheduler-1
2020-12-16 22:54:47.038 10649-10700/com.example.rxjava I/MYTAG: apply 3 RxCachedThreadScheduler-1
2020-12-16 22:54:47.039 10649-10700/com.example.rxjava I/MYTAG: apply 4 RxCachedThreadScheduler-1
2020-12-16 22:54:47.040 10649-10700/com.example.rxjava I/MYTAG: apply 5 RxCachedThreadScheduler-1
2020-12-16 22:54:47.043 10649-10700/com.example.rxjava I/MYTAG: apply 6 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 7 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 8 RxCachedThreadScheduler-1
除了 just
的使用外,您走在正确的轨道上,它需要一个 现有的 对象,因此之前创建和计算该对象的任何事情都发生了。在这种情况下,它是从同一线程调用的 flatMapSingle
的 lambda。
您必须通过 fromCallable
使计算本身成为流程的一部分才能成为 运行 并行,例如:
Observable.fromArray(array)
.flatMapSingle(i -> {
return Single.fromCallable(() -> {
Log.i(TAG, "apply " + i + " " + Thread.currentThread().getName());
return i + 1000;
})
.subscribeOn(Schedulers.io());
})
.observeOn(AndroidSchedulers.mainThread())
// ...
;
我希望从 flatMap 发送到 运行 的每个项目都在其自己的线程上
这是实际用法的简化示例,其中每个项目都是一个 url 请求。
在单个线程
上的每个仍然 运行 上添加 subscribeOn(Schedulers.io())
这里的规则是什么?
Integer[] array= new Integer[100];
for (int i = 0; i < 100; i++){
array[i] = i+1;
}
Observable.fromArray(array)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer i) throws Throwable {
Log.i(TAG, "apply " + i + " " + Thread.currentThread().getName());
return Single.just(i).subscribeOn(Schedulers.io());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer i) {
// Log.i(TAG, "onNext " + Thread.currentThread().getName() + i);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
结果:
2020-12-16 22:54:47.010 10649-10700/com.example.rxjava I/MYTAG: apply 1 RxCachedThreadScheduler-1
2020-12-16 22:54:47.037 10649-10700/com.example.rxjava I/MYTAG: apply 2 RxCachedThreadScheduler-1
2020-12-16 22:54:47.038 10649-10700/com.example.rxjava I/MYTAG: apply 3 RxCachedThreadScheduler-1
2020-12-16 22:54:47.039 10649-10700/com.example.rxjava I/MYTAG: apply 4 RxCachedThreadScheduler-1
2020-12-16 22:54:47.040 10649-10700/com.example.rxjava I/MYTAG: apply 5 RxCachedThreadScheduler-1
2020-12-16 22:54:47.043 10649-10700/com.example.rxjava I/MYTAG: apply 6 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 7 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 8 RxCachedThreadScheduler-1
除了 just
的使用外,您走在正确的轨道上,它需要一个 现有的 对象,因此之前创建和计算该对象的任何事情都发生了。在这种情况下,它是从同一线程调用的 flatMapSingle
的 lambda。
您必须通过 fromCallable
使计算本身成为流程的一部分才能成为 运行 并行,例如:
Observable.fromArray(array)
.flatMapSingle(i -> {
return Single.fromCallable(() -> {
Log.i(TAG, "apply " + i + " " + Thread.currentThread().getName());
return i + 1000;
})
.subscribeOn(Schedulers.io());
})
.observeOn(AndroidSchedulers.mainThread())
// ...
;