Observable.merge 和 RxJava 中的去抖动

Observable.merge and debounce in RxJava

这是我的代码:

package com.example.myapplication;

import io.reactivex.Observable;

public class SampleRx {

    Observable<String> getBoth() {
        return Observable.merge(getSeq1(), getSeq2());
    }

    Observable<String> getSeq1() {
        return Observable.create(emitter -> {
            emitter.onNext("A");

            Thread.sleep(1_500);
            emitter.onNext("B");

            Thread.sleep(500);
            emitter.onNext("C");

            Thread.sleep(250);
            emitter.onNext("D");

            Thread.sleep(2_000);
            emitter.onNext("E");
            // Thread.sleep(2_000);
            emitter.onComplete();
        });
    }

    Observable<String> getSeq2() {
        return Observable.create(emitter -> {
            Thread.sleep(200);
            emitter.onNext("1");

            Thread.sleep(500);
            emitter.onNext("2");

            Thread.sleep(400);
            emitter.onNext("3");

            Thread.sleep(300);
            emitter.onNext("4");

            Thread.sleep(1_800);
            emitter.onNext("5");
            emitter.onComplete();
        });
    }
}

这是输出:

val=A
val=D
val=4
val=5

为什么 5E 被忽略了(因为我猜它后面跟着 onComplete())。

运行 您的代码:

SampleRx().getBoth().subscribe(System.out::println);

我得到:

A
B 
C 
D 
E 
1 
2 
3 
4 
5 

这是正确的行为,结果不会交错,因为这使用 calling/same 线程进行所有发射,并且 merge 仅在两个 Obervables 信号完成时完成。

为了实现合并的交错,每个 Observable 需要 运行 在不同的线程上,所以它们不会互相阻塞,所以如果每个 observable 都在 io 上订阅即

Observable.<String>create(emitter -> {
            emitter.onNext(value);
            ...
            ...
            emitter.onComplete();
        }).subscribeOn(Schedulers.io()); 

然后你得到这个输出:

A, Thread : RxCachedThreadScheduler-1
1, Thread : RxCachedThreadScheduler-2
2, Thread : RxCachedThreadScheduler-2
3, Thread : RxCachedThreadScheduler-2
4, Thread : RxCachedThreadScheduler-2
B, Thread : RxCachedThreadScheduler-1
C, Thread : RxCachedThreadScheduler-1
D, Thread : RxCachedThreadScheduler-1
5, Thread : RxCachedThreadScheduler-2
E, Thread : RxCachedThreadScheduler-1

然后尊重不相互阻塞的独立排放。

您没有提供任何关于去抖动的信息,您在标题中包含了这些信息,所以我无法发表评论。