每当另一个 Observable 发射时跳过源的下一个发射的运算符
Operator that skips the next emission from the source whenever another Observable emits
我有一个用例,每当另一个通知器 Observable 发出时,我需要一个 Observable 来跳过它的下一次发射。
source: |---X---X---X---X---X---X---X---X---X---X--|>
notifier: |-------------X---------X----------X-------|>
result: |---X---X---X-------X---X-------X-------X--|>
基本上,我想要一个名为 skipNextWhen
的运算符,它接受可观察的通知程序并跳过来自源的下一个发射。
我尝试使用一个使用 pausable
运算符的实现(使用 switchMap
重新实现),但无法让它工作。
pausable.ts
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';
declare module 'rxjs/Observable' {
interface Observable<T> {
pausable: typeof pausable;
}
}
function pausable<T>(notifier: Observable<boolean>): Observable<T> {
return notifier.startWith(false).switchMap((paused) => {
if (paused) {
return Observable.never();
} else {
const source = new Subject();
this.subscribe(source);
return source;
}
});
}
Observable.prototype.pausable = pausable;
skipNextWhen.ts
import { Observable } from 'rxjs/Observable';
import './pausable';
declare module 'rxjs/Observable' {
interface Observable<T> {
skipNextWhen: typeof skipNextWhen;
}
}
function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
const notifier = Observable.merge(this.map(() => false),
other.map(() => true));
return this.pausable(notifier);
}
Observable.prototype.skipNextWhen = skipNextWhen;
是否有更合适的运算符我应该考虑改用?我在当前实现中看到的行为是结果 Observable 发出一次,然后再也不会发出——即使通知器 Observable 从不发出。
我可以想到两个解决方案:
使用 .filter()
、.do()
和一些副作用。
这个解决方案可能更容易理解,尽管它不是 "Rx" 的方式:
function skipNextWhen(other) {
let skipNext = false;
return this.merge(other.do(() => skipNext = true).filter(() => false))
.filter(val => {
const doSkip = skipNext;
skipNext = false;
return !doSkip;
});
}
我使用 merge()
只是为了更新 skipNext
,other
的值总是被忽略。
使用.scan()
:
这个解决方案没有任何状态变量和副作用。
function skipNextWhen(other) {
const SKIP = 'skip';
return this.merge(other.mapTo(SKIP))
.scan((acc, val) => {
if (acc === SKIP) {
return null;
} else if (val === SKIP) {
return SKIP;
} else {
return val;
}
}, [])
.filter(val => Boolean(val) && val !== SKIP);
}
基本上,当 SKIP
到达时,我会立即 return 因为它会被 scan()
运算符再次传递到 acc
参数中,然后被 filter()
.
如果我收到一个正常值但之前的值是 SKIP
我会忽略它并且 return 只是 null
后来被过滤掉。
两种解决方案给出相同的结果:
Observable.prototype.skipNextWhen = skipNextWhen;
const source = Observable.range(1, 10)
.concatMap(val => Observable.of(val).delay(100));
source
.skipNextWhen(Observable.interval(350))
.subscribe(console.log);
这将打印以下内容:
1
2
3
5
6
8
9
10
请注意,您实际上并不是在创建新的运算符。您只有一个操作员链的快捷方式。例如,这不会让您在源完成时取消订阅 other
。
我已经开始了一个(非常)小的库,其中包含一些我想要的 rxjs 实用程序。它恰好有一个函数可以完全按照你的要求去做:skipAfter
。来自文档:
source: -1-----2-----3-----4-----5-|
skip$: ----0----------0-0----------
result: -1-----------3-----------5-|
我有一个用例,每当另一个通知器 Observable 发出时,我需要一个 Observable 来跳过它的下一次发射。
source: |---X---X---X---X---X---X---X---X---X---X--|>
notifier: |-------------X---------X----------X-------|>
result: |---X---X---X-------X---X-------X-------X--|>
基本上,我想要一个名为 skipNextWhen
的运算符,它接受可观察的通知程序并跳过来自源的下一个发射。
我尝试使用一个使用 pausable
运算符的实现(使用 switchMap
重新实现),但无法让它工作。
pausable.ts
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';
declare module 'rxjs/Observable' {
interface Observable<T> {
pausable: typeof pausable;
}
}
function pausable<T>(notifier: Observable<boolean>): Observable<T> {
return notifier.startWith(false).switchMap((paused) => {
if (paused) {
return Observable.never();
} else {
const source = new Subject();
this.subscribe(source);
return source;
}
});
}
Observable.prototype.pausable = pausable;
skipNextWhen.ts
import { Observable } from 'rxjs/Observable';
import './pausable';
declare module 'rxjs/Observable' {
interface Observable<T> {
skipNextWhen: typeof skipNextWhen;
}
}
function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
const notifier = Observable.merge(this.map(() => false),
other.map(() => true));
return this.pausable(notifier);
}
Observable.prototype.skipNextWhen = skipNextWhen;
是否有更合适的运算符我应该考虑改用?我在当前实现中看到的行为是结果 Observable 发出一次,然后再也不会发出——即使通知器 Observable 从不发出。
我可以想到两个解决方案:
使用
.filter()
、.do()
和一些副作用。这个解决方案可能更容易理解,尽管它不是 "Rx" 的方式:
function skipNextWhen(other) { let skipNext = false; return this.merge(other.do(() => skipNext = true).filter(() => false)) .filter(val => { const doSkip = skipNext; skipNext = false; return !doSkip; }); }
我使用
merge()
只是为了更新skipNext
,other
的值总是被忽略。使用
.scan()
:这个解决方案没有任何状态变量和副作用。
function skipNextWhen(other) { const SKIP = 'skip'; return this.merge(other.mapTo(SKIP)) .scan((acc, val) => { if (acc === SKIP) { return null; } else if (val === SKIP) { return SKIP; } else { return val; } }, []) .filter(val => Boolean(val) && val !== SKIP); }
基本上,当
SKIP
到达时,我会立即 return 因为它会被scan()
运算符再次传递到acc
参数中,然后被filter()
.如果我收到一个正常值但之前的值是
SKIP
我会忽略它并且 return 只是null
后来被过滤掉。
两种解决方案给出相同的结果:
Observable.prototype.skipNextWhen = skipNextWhen;
const source = Observable.range(1, 10)
.concatMap(val => Observable.of(val).delay(100));
source
.skipNextWhen(Observable.interval(350))
.subscribe(console.log);
这将打印以下内容:
1
2
3
5
6
8
9
10
请注意,您实际上并不是在创建新的运算符。您只有一个操作员链的快捷方式。例如,这不会让您在源完成时取消订阅 other
。
我已经开始了一个(非常)小的库,其中包含一些我想要的 rxjs 实用程序。它恰好有一个函数可以完全按照你的要求去做:skipAfter
。来自文档:
source: -1-----2-----3-----4-----5-|
skip$: ----0----------0-0----------
result: -1-----------3-----------5-|