从不可预测的源 Observable 构建 "Heartbeat" Observable
Build "Heartbeat" Observable from Unpredictable Source Observable
我有一个 Observable,source
,它可能会在不可预测的时间发出项目。我正在尝试使用它来构建另一个每 500 毫秒可靠地发出其值的 Observable。
假设 source
在这些时间发出值:
- 100 毫秒 - 第一项
- 980 毫秒 - 第二项
- 1020 毫秒 - 第三项
- 1300ms - 第四项等
我想“平滑”这个流,以便我得到如下输出:
- 500 毫秒 - 第一项
- 1000 毫秒 - 第二项
- 1500 毫秒 - 第三项
- 2000ms - 第四项
一种天真的方法可能只是在源项的排放之间添加延迟。但是,这不会像我想要的那样创建均匀间隔的间隔。
我已经尝试了 .timer()
、.interval()
和 .flatMap()
的各种组合,但没有任何希望。
您可以结合使用 combineLatest
、interval
和 throttle
- 您可以添加第二个可观察对象,interval
以及您想要的调用间隔时间(例如 500 毫秒), 所以你的 observable 将每 500 毫秒发射一次(当与 combineLatest
一起使用时),现在它将每 500 毫秒发射一次值 和 每次原始 source
发射时,所以你可以在管道中添加 throttle
,这将导致间隔节流:
combineLatest([source, timer(5000)])
.pipe(
throttle(() => interval(5000)),
tap(([value]) => {
console.log("emitted", value, new Date().getSeconds());
})
)
.subscribe();
(tap
这里不用加,只是为了演示)
我想你可以试试这个:
const src$ = merge(
timer(100).pipe(mapTo(1)),
timer(980).pipe(mapTo(2)),
timer(1020).pipe(mapTo(3)),
timer(1300).pipe(mapTo(4))
);
src$
.pipe(
bufferTime(500),
mergeAll()
)
.subscribe(console.log);
bufferTime
用于创建一个计时器,该计时器将以恒定间隔发出,而不管发出的值如何。然后 mergeAll
用于 分解 由 bufferTime
.
产生的数组
对于比您的间隔发射得更快的源
zip
所需时间跨度 interval
的来源。
zip(source, interval(500)).pipe(
map(([value, _]) => value) // only emit the source value
)
zip
发出来自 source
的第一项和来自 interval
的第一项,然后发出来自 source
的第二项和来自 interval
的第二项] 等等。如果输出 observable 只应在 interval
发出时发出,则 source
的第 N 个值必须在 来自 interval
的第 N 个值之前到达 。
潜在问题:
如果您的 source
在某些时候发出的速度比 interval
慢(即来自 source
的第 N 个值到达 在 来自 interval
的第 N 个值之后) 那么 zip
会直接发射而不用等待下一次 interval
发射。
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
zip output: -----1-----2-----3-----4--------5----6-----
✓ ✓ ✓ ✓ ⚠️ ⚠️
// emits 5 and 6 don't happen when interval emits
对于以任何速率发射的源
function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
defer(() => {
let sourceCompleted = false;
const queue = source.pipe(
tap({ complete: () => (sourceCompleted = true) }),
scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
);
return interval(period).pipe(
withLatestFrom(queue), // combine with the latest buffer
takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
);
});
}
source.pipe(
emitOnInterval(500)
)
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
output: -----1-----2-----3-----4-----------5-----6-
✓ ✓ ✓ ✓ ✓ ✓
// all output emits happen when interval emits
我有一个 Observable,source
,它可能会在不可预测的时间发出项目。我正在尝试使用它来构建另一个每 500 毫秒可靠地发出其值的 Observable。
假设 source
在这些时间发出值:
- 100 毫秒 - 第一项
- 980 毫秒 - 第二项
- 1020 毫秒 - 第三项
- 1300ms - 第四项等
我想“平滑”这个流,以便我得到如下输出:
- 500 毫秒 - 第一项
- 1000 毫秒 - 第二项
- 1500 毫秒 - 第三项
- 2000ms - 第四项
一种天真的方法可能只是在源项的排放之间添加延迟。但是,这不会像我想要的那样创建均匀间隔的间隔。
我已经尝试了 .timer()
、.interval()
和 .flatMap()
的各种组合,但没有任何希望。
您可以结合使用 combineLatest
、interval
和 throttle
- 您可以添加第二个可观察对象,interval
以及您想要的调用间隔时间(例如 500 毫秒), 所以你的 observable 将每 500 毫秒发射一次(当与 combineLatest
一起使用时),现在它将每 500 毫秒发射一次值 和 每次原始 source
发射时,所以你可以在管道中添加 throttle
,这将导致间隔节流:
combineLatest([source, timer(5000)])
.pipe(
throttle(() => interval(5000)),
tap(([value]) => {
console.log("emitted", value, new Date().getSeconds());
})
)
.subscribe();
(tap
这里不用加,只是为了演示)
我想你可以试试这个:
const src$ = merge(
timer(100).pipe(mapTo(1)),
timer(980).pipe(mapTo(2)),
timer(1020).pipe(mapTo(3)),
timer(1300).pipe(mapTo(4))
);
src$
.pipe(
bufferTime(500),
mergeAll()
)
.subscribe(console.log);
bufferTime
用于创建一个计时器,该计时器将以恒定间隔发出,而不管发出的值如何。然后 mergeAll
用于 分解 由 bufferTime
.
对于比您的间隔发射得更快的源
zip
所需时间跨度 interval
的来源。
zip(source, interval(500)).pipe(
map(([value, _]) => value) // only emit the source value
)
zip
发出来自 source
的第一项和来自 interval
的第一项,然后发出来自 source
的第二项和来自 interval
的第二项] 等等。如果输出 observable 只应在 interval
发出时发出,则 source
的第 N 个值必须在 来自 interval
的第 N 个值之前到达 。
潜在问题:
如果您的 source
在某些时候发出的速度比 interval
慢(即来自 source
的第 N 个值到达 在 来自 interval
的第 N 个值之后) 那么 zip
会直接发射而不用等待下一次 interval
发射。
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
zip output: -----1-----2-----3-----4--------5----6-----
✓ ✓ ✓ ✓ ⚠️ ⚠️
// emits 5 and 6 don't happen when interval emits
对于以任何速率发射的源
function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
defer(() => {
let sourceCompleted = false;
const queue = source.pipe(
tap({ complete: () => (sourceCompleted = true) }),
scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
);
return interval(period).pipe(
withLatestFrom(queue), // combine with the latest buffer
takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
);
});
}
source.pipe(
emitOnInterval(500)
)
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
output: -----1-----2-----3-----4-----------5-----6-
✓ ✓ ✓ ✓ ✓ ✓
// all output emits happen when interval emits