RxJava3:debounce/skip 整数流中不需要的“0”,但只有在连续获取时才需要“0”

RxJava3 : debounce/skip unwanted '0' in a flow of integers but want '0' only if getting continuously

我有一个 Integer 类型的 Observable,它发出 0 和正数,还有一个 Observer 来捕捉两者。

Observable 每 10 毫秒发出连续的零或连续的正数(一次一个项目)。

0,0,0,0,0,0,0,5,5,6,7,8,8,9,10,11,4,5,6,5,0,0,0,0,0...[预期]

它还在正数之间发出零(很少)。

0,0,0,0,0,0,0,5,5,6,7,8,8,**0**,9,10,11,4,5,6,5,0,0,0,0,0...[意外]

我想 debounce/skip 这个零出现在两个正数之间,但如果它是连续的,我有兴趣捕捉 0。

在 rxjava 中是否有任何运算符组合可以实现此目的。提前致谢。

代码看起来像这样:

public Observer<Integer> valueObserver = new DisposingObserver<Integer>() {
        @Override
        public void onNext(Integer value) {
         //every 10 seconds a value is received
         //do action based on zero or non-zero values.
        }
    };

Observable<Integer> sourceObservable = Observable.just(0,0,0,0,0,0,0,5,5,6,7,8,8,0,9,10,11,4,5,6,5,0,0,0,0,0,...);
sourceObservable.subscribe(valueObserver);

预期o/p:0,5,6,7,8,9,10,11,4,5,6,5,0

我可以使用 distinctUntilChanged 运算符消除连续的重复元素。

这需要记住流到目前为止产生了多少个后续零并且什么都不发出,一个零,两个零或非零项。您可以使用 flatMapIterable 执行此操作,但计算零是有状态的,因此需要 defer:

static ObservableTransformer<Integer, Integer> skipSingleZero() {
    return source -> Observable.defer(() -> {
        AtomicInteger zerosSeen = new AtomicInteger();

        return source
        .flatMapIterable(item -> {
            if (item == 0) {
                int n = zerosSeen.getAndIncrement();
                // first zero we saw, don't emit it
                if (n == 0) {
                    return Collections.emptyList();
                }
                // the second zero in a row, it means a streak so emit both
                if (n == 1) {
                    return Arrays.asList(0, 0);
                }
                // third or more zeros, just emit those now on
                return Collections.singletonList(0);
            }
            // a non-zero item, reset the counter
            zerosSeen.set(0);
            return Collections.singletonList(item);
        });
    });
}

这是一个中间有一个零的序列,因此应该跳过:

Observable<Integer> source = Observable.fromArray(
        0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
        0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);

source
.compose(skipSingleZero())
.test()
.assertResult(
        0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
        9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);

中间有一条 2 条条纹,应该保留:

Observable<Integer> source = Observable.fromArray(
        0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
        0, 0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
source
.compose(skipSingleZero())
.test()
.assertResult(
        0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
        0, 0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);