rxjava zip 不适用于指定的调度程序?
rxjava zip not working on specified schedulers?
我有这个实现,其中有 2 个单曲,它们是对数据库的并行调用以获取对象。然后我使用 rxjava zip 运算符来聚合结果。如下所示:
...
(other code)
Single<A> a = Single.fromCallable(() -> {
System.out.println("Inside single a, running in thread " + Thread.currentThread().getName());
}).subscribeOn(Schedulers.io);
Single<B> b = Single.fromCallable(() -> {
System.out.println("Inside single b, running in thread " + Thread.currentThread().getName());
}).subscribeOn(Schedulers.io);
Single<C> result = Single.zip(a, b, aggregationFunc2())
.subscribeOn(Schedulers.computation())
.subscribe(response::resume, response::resume);
private Func2<a, b, c> aggregationFunc2() {
System.out.println("Inside aggregationFunc2, running in thread " + Thread.currentThread().getName());
return (a, b) -> {
CBuilder cBuilder = new Builder();
if (a != null) {
cBuilder.setA(a);
}
if (b != null) {
cBuilder.setB(b);
}
return cBuilder.build();
};
}
但是,我得到的日志记录是 a 和 b 运行 都在具有不同 io 线程的 IO 线程池中,比如 io-2 和 io-3。这是预料之中的,因为我指定每个单独的都在 IO 调度程序中。但是 zip 函数在默认主线程中仍然是 运行,而我希望它在计算线程中。不知道为什么,知道吗?
此外,我尝试了 observeOn() for zip,它仍然 运行 在默认主线程中。
因为你在主线程上调用了aggregationFunc2
。
稍微重写一下代码应该可以揭示误解:
Func2<A, B, C> f = aggregationFunc2();
Single<C> result = Single.zip(a, b, f);
您在创建和返回函数本身之前进行打印。您必须将打印语句移动到创建的函数中:
private Func2<a, b, c> aggregationFunc2() {
return (a, b) -> {
System.out.println("Inside aggregationFunc2, running in thread "
+ Thread.currentThread().getName());
CBuilder cBuilder = new Builder();
if (a != null) {
cBuilder.setA(a);
}
if (b != null) {
cBuilder.setB(b);
}
return cBuilder.build();
};
}
我有这个实现,其中有 2 个单曲,它们是对数据库的并行调用以获取对象。然后我使用 rxjava zip 运算符来聚合结果。如下所示:
...
(other code)
Single<A> a = Single.fromCallable(() -> {
System.out.println("Inside single a, running in thread " + Thread.currentThread().getName());
}).subscribeOn(Schedulers.io);
Single<B> b = Single.fromCallable(() -> {
System.out.println("Inside single b, running in thread " + Thread.currentThread().getName());
}).subscribeOn(Schedulers.io);
Single<C> result = Single.zip(a, b, aggregationFunc2())
.subscribeOn(Schedulers.computation())
.subscribe(response::resume, response::resume);
private Func2<a, b, c> aggregationFunc2() {
System.out.println("Inside aggregationFunc2, running in thread " + Thread.currentThread().getName());
return (a, b) -> {
CBuilder cBuilder = new Builder();
if (a != null) {
cBuilder.setA(a);
}
if (b != null) {
cBuilder.setB(b);
}
return cBuilder.build();
};
}
但是,我得到的日志记录是 a 和 b 运行 都在具有不同 io 线程的 IO 线程池中,比如 io-2 和 io-3。这是预料之中的,因为我指定每个单独的都在 IO 调度程序中。但是 zip 函数在默认主线程中仍然是 运行,而我希望它在计算线程中。不知道为什么,知道吗?
此外,我尝试了 observeOn() for zip,它仍然 运行 在默认主线程中。
因为你在主线程上调用了aggregationFunc2
。
稍微重写一下代码应该可以揭示误解:
Func2<A, B, C> f = aggregationFunc2();
Single<C> result = Single.zip(a, b, f);
您在创建和返回函数本身之前进行打印。您必须将打印语句移动到创建的函数中:
private Func2<a, b, c> aggregationFunc2() {
return (a, b) -> {
System.out.println("Inside aggregationFunc2, running in thread "
+ Thread.currentThread().getName());
CBuilder cBuilder = new Builder();
if (a != null) {
cBuilder.setA(a);
}
if (b != null) {
cBuilder.setB(b);
}
return cBuilder.build();
};
}