每当另一个 Observable 发射时跳过源的下一个发射的运算符

Operator that skips the next emission from the source whenever another Observable emits

我有一个用例,每当另一个通知器 Observable 发出时,我需要一个 Observable 来跳过它的下一次发射。

source:    |---X---X---X---X---X---X---X---X---X---X--|>
notifier:  |-------------X---------X----------X-------|>
result:    |---X---X---X-------X---X-------X-------X--|>

基本上,我想要一个名为 skipNextWhen 的运算符,它接受可观察的通知程序并跳过来自源的下一个发射。

我尝试使用一个使用 pausable 运算符的实现(使用 switchMap 重新实现),但无法让它工作。

pausable.ts

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        pausable: typeof pausable;
    }
}

function pausable<T>(notifier: Observable<boolean>): Observable<T> {
    return notifier.startWith(false).switchMap((paused) => {
        if (paused) {
            return Observable.never();
        } else {
            const source = new Subject();
            this.subscribe(source);
            return source;
        }
    });
}

Observable.prototype.pausable = pausable;

skipNextWhen.ts

import { Observable } from 'rxjs/Observable';
import './pausable';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        skipNextWhen: typeof skipNextWhen;
    }
}

function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
    const notifier = Observable.merge(this.map(() => false), 
                                      other.map(() => true));
    return this.pausable(notifier);
}

Observable.prototype.skipNextWhen = skipNextWhen;

是否有更合适的运算符我应该考虑改用?我在当前实现中看到的行为是结果 Observable 发出一次,然后再也不会发出——即使通知器 Observable 从不发出。

我可以想到两个解决方案:

  1. 使用 .filter().do() 和一些副作用。

    这个解决方案可能更容易理解,尽管它不是 "Rx" 的方式:

    function skipNextWhen(other) {
        let skipNext = false;
    
        return this.merge(other.do(() => skipNext = true).filter(() => false))
            .filter(val => {
                const doSkip = skipNext;
                skipNext = false;
                return !doSkip;
            });
    }
    

    我使用 merge() 只是为了更新 skipNextother 的值总是被忽略。

  2. 使用.scan():

    这个解决方案没有任何状态变量和副作用。

    function skipNextWhen(other) {
        const SKIP = 'skip';
    
        return this.merge(other.mapTo(SKIP))
            .scan((acc, val) => {
                if (acc === SKIP) {
                    return null;
                } else if (val === SKIP) {
                    return SKIP;
                } else {
                    return val;
                }
            }, [])
            .filter(val => Boolean(val) && val !== SKIP);
    }
    

    基本上,当 SKIP 到达时,我会立即 return 因为它会被 scan() 运算符再次传递到 acc 参数中,然后被 filter().

    如果我收到一个正常值但之前的值是 SKIP 我会忽略它并且 return 只是 null 后来被过滤掉。

两种解决方案给出相同的结果:

Observable.prototype.skipNextWhen = skipNextWhen;

const source = Observable.range(1, 10)
    .concatMap(val => Observable.of(val).delay(100));

source
    .skipNextWhen(Observable.interval(350))
    .subscribe(console.log);

这将打印以下内容:

1
2
3
5
6
8
9
10

请注意,您实际上并不是在创建新的运算符。您只有一个操作员链的快捷方式。例如,这不会让您在源完成时取消订阅 other

我已经开始了一个(非常)小的库,其中包含一些我想要的 rxjs 实用程序。它恰好有一个函数可以完全按照你的要求去做:skipAfter。来自文档:

source: -1-----2-----3-----4-----5-|
skip$:  ----0----------0-0----------

result: -1-----------3-----------5-|

图书馆在这里:https://github.com/simontonsoftware/s-rxjs-utils