Observable.merge 和 RxJava 中的去抖动
Observable.merge and debounce in RxJava
这是我的代码:
package com.example.myapplication;
import io.reactivex.Observable;
public class SampleRx {
Observable<String> getBoth() {
return Observable.merge(getSeq1(), getSeq2());
}
Observable<String> getSeq1() {
return Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(1_500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2_000);
emitter.onNext("E");
// Thread.sleep(2_000);
emitter.onComplete();
});
}
Observable<String> getSeq2() {
return Observable.create(emitter -> {
Thread.sleep(200);
emitter.onNext("1");
Thread.sleep(500);
emitter.onNext("2");
Thread.sleep(400);
emitter.onNext("3");
Thread.sleep(300);
emitter.onNext("4");
Thread.sleep(1_800);
emitter.onNext("5");
emitter.onComplete();
});
}
}
这是输出:
val=A
val=D
val=4
val=5
为什么 5
而 E
被忽略了(因为我猜它后面跟着 onComplete()
)。
运行 您的代码:
SampleRx().getBoth().subscribe(System.out::println);
我得到:
A
B
C
D
E
1
2
3
4
5
这是正确的行为,结果不会交错,因为这使用 calling/same 线程进行所有发射,并且 merge
仅在两个 Obervables 信号完成时完成。
为了实现合并的交错,每个 Observable
需要 运行 在不同的线程上,所以它们不会互相阻塞,所以如果每个 observable 都在 io
上订阅即
Observable.<String>create(emitter -> {
emitter.onNext(value);
...
...
emitter.onComplete();
}).subscribeOn(Schedulers.io());
然后你得到这个输出:
A, Thread : RxCachedThreadScheduler-1
1, Thread : RxCachedThreadScheduler-2
2, Thread : RxCachedThreadScheduler-2
3, Thread : RxCachedThreadScheduler-2
4, Thread : RxCachedThreadScheduler-2
B, Thread : RxCachedThreadScheduler-1
C, Thread : RxCachedThreadScheduler-1
D, Thread : RxCachedThreadScheduler-1
5, Thread : RxCachedThreadScheduler-2
E, Thread : RxCachedThreadScheduler-1
然后尊重不相互阻塞的独立排放。
您没有提供任何关于去抖动的信息,您在标题中包含了这些信息,所以我无法发表评论。
这是我的代码:
package com.example.myapplication;
import io.reactivex.Observable;
public class SampleRx {
Observable<String> getBoth() {
return Observable.merge(getSeq1(), getSeq2());
}
Observable<String> getSeq1() {
return Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(1_500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2_000);
emitter.onNext("E");
// Thread.sleep(2_000);
emitter.onComplete();
});
}
Observable<String> getSeq2() {
return Observable.create(emitter -> {
Thread.sleep(200);
emitter.onNext("1");
Thread.sleep(500);
emitter.onNext("2");
Thread.sleep(400);
emitter.onNext("3");
Thread.sleep(300);
emitter.onNext("4");
Thread.sleep(1_800);
emitter.onNext("5");
emitter.onComplete();
});
}
}
这是输出:
val=A
val=D
val=4
val=5
为什么 5
而 E
被忽略了(因为我猜它后面跟着 onComplete()
)。
运行 您的代码:
SampleRx().getBoth().subscribe(System.out::println);
我得到:
A
B
C
D
E
1
2
3
4
5
这是正确的行为,结果不会交错,因为这使用 calling/same 线程进行所有发射,并且 merge
仅在两个 Obervables 信号完成时完成。
为了实现合并的交错,每个 Observable
需要 运行 在不同的线程上,所以它们不会互相阻塞,所以如果每个 observable 都在 io
上订阅即
Observable.<String>create(emitter -> {
emitter.onNext(value);
...
...
emitter.onComplete();
}).subscribeOn(Schedulers.io());
然后你得到这个输出:
A, Thread : RxCachedThreadScheduler-1
1, Thread : RxCachedThreadScheduler-2
2, Thread : RxCachedThreadScheduler-2
3, Thread : RxCachedThreadScheduler-2
4, Thread : RxCachedThreadScheduler-2
B, Thread : RxCachedThreadScheduler-1
C, Thread : RxCachedThreadScheduler-1
D, Thread : RxCachedThreadScheduler-1
5, Thread : RxCachedThreadScheduler-2
E, Thread : RxCachedThreadScheduler-1
然后尊重不相互阻塞的独立排放。
您没有提供任何关于去抖动的信息,您在标题中包含了这些信息,所以我无法发表评论。