RxJs:条件为真时缓冲事件,条件为假时传递事件
RxJs: buffer events when condition is true, pass events through when condition is false
我在下面创建了 Observable 构造函数,它按描述工作。有谁知道使用 RxJs 附带的运算符是否有更简洁的方法来实现相同的行为?我正在查看接近所需行为的 bufferToggle,但我需要在缓冲区关闭时传递发出的值。
函数说明:如果condition
发出true
,则缓冲发出的source
值,并通过发出的source
] 值,如果 condition
发出 false
。如果条件在 true
之后发出 false
,缓冲区将按照接收到的顺序释放每个值。缓冲区初始化为通过发出的 source
值,直到 condition
发出 true
.
function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.do(value => isBufferOpen = value)
.filter(value => !value)
.subscribe(value => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => {
subscriptions.forEach(sub => sub.unsubscribe());
};
});
}
编辑
作为对评论的回应,以下功能与上述功能相同,但采用 RxJs 运算符的形式,并更新为使用 RxJx 6+ pipeabale 运算符:
function bufferIf<T>(condition: Observable<boolean>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer: T[] = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.pipe(
tap(con => isBufferOpen = con),
filter(() => !isBufferOpen)
).subscribe(() => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => subscriptions.forEach(sub => sub.unsubscribe());
});
}
}
我找到了一个基于运算符而不是订阅的解决方案,但犹豫是否称它更简洁。
请注意,如果可以保证缓冲区 on/off 流始终以关闭结束(即发出奇数),则可以删除 endToken。
console.clear()
const Observable = Rx.Observable
// Source and buffering observables
const source$ = Observable.timer(0, 200).take(15)
const bufferIt$ = Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)
// Function to switch buffering
const endToken = 'end'
const bufferScanner = { buffering: false, value: null, buffer: [] }
const bufferSwitch = (scanner, [src, buffering]) => {
const onBufferClose = (scanner.buffering && !buffering) || (src === endToken)
const buffer = (buffering || onBufferClose) ? scanner.buffer.concat(src) : []
const value = onBufferClose ? buffer : buffering ? null : [src]
return { buffering, value, buffer }
}
// Operator chain
const output =
source$
.concat(Observable.of(endToken)) // signal last buffer to emit
.withLatestFrom(bufferIt$) // add buffering flag to stream
.scan(bufferSwitch, bufferScanner) // turn buffering on and off
.map(x => x.value) // deconsruct bufferScanner
.filter(x => x) // ignore null values
.mergeAll() // deconstruct buffer array
.filter(x => x !== endToken) // ignore endToken
// Proof
const start = new Date()
const outputDisplay = output.timestamp()
.map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
.map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
脚注
我还找到了一个基于 buffer()
的解决方案,但我不相信它在高频源下是否稳定。某些缓冲区配置似乎有些矫揉造作(即声明看起来不错,但测试显示偶尔的延迟会干扰缓冲区操作)。
无论如何,供参考,
/*
Alternate with buffered and unbuffered streams
*/
const buffered =
source$.withLatestFrom(bufferIt$)
.filter(([x, bufferIsOn]) => bufferIsOn)
.map(x => x[0])
.buffer(bufferIt$.filter(x => !x))
.filter(x => x.length) // filter out empty buffers
.mergeAll() // unwind the buffer
const unbuffered =
source$.withLatestFrom(bufferIt$)
.filter(([x, bufferIsOn]) => !bufferIsOn)
.map(x => x[0])
const output = buffered.merge(unbuffered)
使用热 observable
这是另一种方式,稍微简短一些(添加一个新的答案,因为之前的答案很忙)
// Source and buffering observables
const source$ = Rx.Observable.timer(0, 200).take(15)
const bufferIt$ = Rx.Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)
const makeHot$ = (src) => {
const hot$ = new Rx.Subject();
src.subscribe(x => hot$.next(x));
return hot$;
}
// Buffered output
const buffered$ = (source, bufferIt) => {
const hot$ = makeHot$(source)
const close = new Rx.Subject()
return bufferIt
.concat(Rx.Observable.of(false)) // ensure last buffer emits
.do(x => {if(!x) close.next(true)} ) // close previous buffer
.switchMap(x => x ? hot$.buffer(close) : hot$.map(x=>[x]))
.mergeAll()
}
// Proof
const start = new Date()
const outputDisplay = buffered$(source$, bufferIt$).timestamp()
.map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
.map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
我最近一直在寻找类似的解决方案,终于想到了这个。
可能会帮助其他人,假设一些去抖动时间是可以接受的(也许是可取的)
source$.pipe(buffer(source$.pipe(
debounceTime(500),
filter(condition)
)))
我在下面创建了 Observable 构造函数,它按描述工作。有谁知道使用 RxJs 附带的运算符是否有更简洁的方法来实现相同的行为?我正在查看接近所需行为的 bufferToggle,但我需要在缓冲区关闭时传递发出的值。
函数说明:如果condition
发出true
,则缓冲发出的source
值,并通过发出的source
] 值,如果 condition
发出 false
。如果条件在 true
之后发出 false
,缓冲区将按照接收到的顺序释放每个值。缓冲区初始化为通过发出的 source
值,直到 condition
发出 true
.
function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.do(value => isBufferOpen = value)
.filter(value => !value)
.subscribe(value => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => {
subscriptions.forEach(sub => sub.unsubscribe());
};
});
}
编辑
作为对评论的回应,以下功能与上述功能相同,但采用 RxJs 运算符的形式,并更新为使用 RxJx 6+ pipeabale 运算符:
function bufferIf<T>(condition: Observable<boolean>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer: T[] = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.pipe(
tap(con => isBufferOpen = con),
filter(() => !isBufferOpen)
).subscribe(() => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => subscriptions.forEach(sub => sub.unsubscribe());
});
}
}
我找到了一个基于运算符而不是订阅的解决方案,但犹豫是否称它更简洁。
请注意,如果可以保证缓冲区 on/off 流始终以关闭结束(即发出奇数),则可以删除 endToken。
console.clear()
const Observable = Rx.Observable
// Source and buffering observables
const source$ = Observable.timer(0, 200).take(15)
const bufferIt$ = Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)
// Function to switch buffering
const endToken = 'end'
const bufferScanner = { buffering: false, value: null, buffer: [] }
const bufferSwitch = (scanner, [src, buffering]) => {
const onBufferClose = (scanner.buffering && !buffering) || (src === endToken)
const buffer = (buffering || onBufferClose) ? scanner.buffer.concat(src) : []
const value = onBufferClose ? buffer : buffering ? null : [src]
return { buffering, value, buffer }
}
// Operator chain
const output =
source$
.concat(Observable.of(endToken)) // signal last buffer to emit
.withLatestFrom(bufferIt$) // add buffering flag to stream
.scan(bufferSwitch, bufferScanner) // turn buffering on and off
.map(x => x.value) // deconsruct bufferScanner
.filter(x => x) // ignore null values
.mergeAll() // deconstruct buffer array
.filter(x => x !== endToken) // ignore endToken
// Proof
const start = new Date()
const outputDisplay = output.timestamp()
.map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
.map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
脚注
我还找到了一个基于 buffer()
的解决方案,但我不相信它在高频源下是否稳定。某些缓冲区配置似乎有些矫揉造作(即声明看起来不错,但测试显示偶尔的延迟会干扰缓冲区操作)。
无论如何,供参考,
/*
Alternate with buffered and unbuffered streams
*/
const buffered =
source$.withLatestFrom(bufferIt$)
.filter(([x, bufferIsOn]) => bufferIsOn)
.map(x => x[0])
.buffer(bufferIt$.filter(x => !x))
.filter(x => x.length) // filter out empty buffers
.mergeAll() // unwind the buffer
const unbuffered =
source$.withLatestFrom(bufferIt$)
.filter(([x, bufferIsOn]) => !bufferIsOn)
.map(x => x[0])
const output = buffered.merge(unbuffered)
使用热 observable
这是另一种方式,稍微简短一些(添加一个新的答案,因为之前的答案很忙)
// Source and buffering observables
const source$ = Rx.Observable.timer(0, 200).take(15)
const bufferIt$ = Rx.Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)
const makeHot$ = (src) => {
const hot$ = new Rx.Subject();
src.subscribe(x => hot$.next(x));
return hot$;
}
// Buffered output
const buffered$ = (source, bufferIt) => {
const hot$ = makeHot$(source)
const close = new Rx.Subject()
return bufferIt
.concat(Rx.Observable.of(false)) // ensure last buffer emits
.do(x => {if(!x) close.next(true)} ) // close previous buffer
.switchMap(x => x ? hot$.buffer(close) : hot$.map(x=>[x]))
.mergeAll()
}
// Proof
const start = new Date()
const outputDisplay = buffered$(source$, bufferIt$).timestamp()
.map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
.map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
我最近一直在寻找类似的解决方案,终于想到了这个。
可能会帮助其他人,假设一些去抖动时间是可以接受的(也许是可取的)
source$.pipe(buffer(source$.pipe(
debounceTime(500),
filter(condition)
)))