RxJava - flatmap vs concatMap - 为什么在订阅时订购相同?
RxJava - flatmap vs concatMap - why is ordering the same on subscription?
根据 this thread conCatMap 和 flatmap 仅在项目发出的顺序上有所不同。所以我做了一个测试并创建了一个简单的整数流,想看看它们的发射顺序。我做了一个小的 observable,它将接受 1-5 范围内的数字并将它们乘以 2。简单。
这是带有平面图的代码:
myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer * 2);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.v("myapp","from flatMap:"+integer);
}
});
和使用 concatMap 的完全相同的代码:
myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer * 2);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.v("myapp","from concatmap:"+integer);
}
});
当我在日志中看到打印出来时,两者的顺序相同,为什么?我以为只有 concatMap 会保留顺序?
您所看到的纯属巧合。每次你的 flatMap
returns 一个值,它都会在与前一个线程相同的线程上这样做。
我修改了您的示例以利用多线程:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(integer -> Observable.just(integer)
.observeOn(Schedulers.computation())
.flatMap(i -> {
try {
Thread.sleep(new Random().nextInt(1000));
return Observable.just(2 * i);
} catch (InterruptedException e) {
e.printStackTrace();
return Observable.error(e);
}
}))
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("onCompleted"));
我通过随机延迟延迟每个 2 * i
值以强制不同的顺序。此外,我在此之前添加了 observeOn(Schedulers.computation())
,以便下一个运算符 (flatMap
) 在计算线程池上运行——这具有多线程魔力。
这是我的示例(在 Android 上)得到的输出:
I/System.out: 6
I/System.out: 4
I/System.out: 12
I/System.out: 14
I/System.out: 8
I/System.out: 2
I/System.out: 16
I/System.out: 20
I/System.out: 10
I/System.out: 18
I/System.out: onCompleted
如果我将 just
之后的 flatMap
替换为 concatMap
,那么我会得到正确排序的输出。
有一个 great post by Thomas Nield 有适当的解释。
根据 this thread conCatMap 和 flatmap 仅在项目发出的顺序上有所不同。所以我做了一个测试并创建了一个简单的整数流,想看看它们的发射顺序。我做了一个小的 observable,它将接受 1-5 范围内的数字并将它们乘以 2。简单。
这是带有平面图的代码:
myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer * 2);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.v("myapp","from flatMap:"+integer);
}
});
和使用 concatMap 的完全相同的代码:
myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer * 2);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.v("myapp","from concatmap:"+integer);
}
});
当我在日志中看到打印出来时,两者的顺序相同,为什么?我以为只有 concatMap 会保留顺序?
您所看到的纯属巧合。每次你的 flatMap
returns 一个值,它都会在与前一个线程相同的线程上这样做。
我修改了您的示例以利用多线程:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(integer -> Observable.just(integer)
.observeOn(Schedulers.computation())
.flatMap(i -> {
try {
Thread.sleep(new Random().nextInt(1000));
return Observable.just(2 * i);
} catch (InterruptedException e) {
e.printStackTrace();
return Observable.error(e);
}
}))
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("onCompleted"));
我通过随机延迟延迟每个 2 * i
值以强制不同的顺序。此外,我在此之前添加了 observeOn(Schedulers.computation())
,以便下一个运算符 (flatMap
) 在计算线程池上运行——这具有多线程魔力。
这是我的示例(在 Android 上)得到的输出:
I/System.out: 6
I/System.out: 4
I/System.out: 12
I/System.out: 14
I/System.out: 8
I/System.out: 2
I/System.out: 16
I/System.out: 20
I/System.out: 10
I/System.out: 18
I/System.out: onCompleted
如果我将 just
之后的 flatMap
替换为 concatMap
,那么我会得到正确排序的输出。
有一个 great post by Thomas Nield 有适当的解释。