rxjs 嵌套可观察对象 window 和扫描的意外行为
Unexpected behavior with rxjs nested observables, window, and scan
我想在数据输入时显示分析的部分结果。为每个新值重新计算效率非常低(与 'scan' 一样)。但是,在这种情况下,我可以对数据块进行分析并将这些结果结合起来。所以我一直在使用 'window' 来分解数据,然后 'scan' 来组合每个 window 计算的结果。结果本身就是一个可观察对象,因此将其作为嵌套可观察对象发出是很自然的。此外,该过程的下一步在使用可观察对象时效果非常好。
但是,我无法按预期让它工作。 (我确实通过一个笨拙的步骤将内部可观察对象转换为数组,然后再转换回可观察对象。)似乎有些事情我不明白 "window" and/or "scan" .
这里有两个示例,它们在生成嵌套可观察对象的方式上有所不同。我本以为以下两个示例会给出相同的结果,但事实并非如此。
首先,我直接创建嵌套的可观察对象。第二,我使用 window 操作创建它。然后,在这两种情况下,我都对嵌套的可观察对象应用相同的扫描。
这符合我的预期:
rxjs.from([rxjs.from([1, 2]), rxjs.from([3, 4])])
.pipe(
ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
).subscribe(win => win.subscribe(
x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
e => console.log("outer error"), () => console.log("outer |"))
对于每个发出的可观察值,我看到前一个值的累积,然后是新值。
1 2 | 1 2 3 4 |
我预计下一个会产生相同的结果,但事实并非如此:
rxjs.from([1, 2, 3, 4])
.pipe(
ops.windowCount(2),
ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
).subscribe(win => win.subscribe(x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
e => console.log("outer error"), () => console.log("outer|"))
似乎有效地忽略了扫描操作并发出原始的windows,
1 2 | 3 4 |
我错过了什么?传统的解决方案是什么样的?谢谢!
windowCount
正在内部使用 Subject
。因此它创建并 returns 一个 Subject,然后将 1
和 2
发送给它作为第一个 window。在第一个 scan
迭代中,您在 1
和 2
发送和接收这些值之前订阅了该主题。对于以后的迭代,您在 1
和 2
已经发出后订阅,因此您不会再次收到这些值。
有点喜欢:
const { Subject, merge, from } = rxjs
const window1 = new Subject()
const scanResult1 = merge(from([]), window1)
scanResult1.subscribe(console.log)
window1.next(1)
window1.next(2)
console.log('|')
const window2 = new Subject()
const scanResult2 = merge(scanResult1, window2)
scanResult2.subscribe(console.log)
window2.next(3)
window2.next(4)
<script src="https://unpkg.com/@reactivex/rxjs@6.5.5/dist/global/rxjs.umd.js"></script>
使用 bufferCount
您只需将 windowCount
替换为 bufferCount
即可将数组发送到 scan
而不是主题。 scan
中的代码可以与 merge
中的代码保持一致,也可以处理数组,但是如果要保证值在他们进来的顺序相同。
rxjs.from([1, 2, 3, 4])
.pipe(
ops.bufferCount(2),
ops.scan((acc, curr) => rxjs.concat(acc, curr), rxjs.from([]))
).subscribe(
win => win.subscribe(
x => console.log(JSON.stringify(x)),
e => console.log("error"), () => console.log("|")
),
e => console.log("outer error"),
() => console.log("outer|")
)
使用 window计数
您可以将 shareReplay
添加到您的 windows,以向未来的订阅者重播它们的价值。如果源的计数可以被 windowSize 整除,windowCount
在末尾发出一个空的 window,只有当当前 window 不为空。否则你会得到两次最终结果。
from([1, 2, 3, 4]).pipe(
windowCount(2),
scan((acc, curr) => {
const shared = curr.pipe(shareReplay())
return shared.pipe(
isEmpty(),
switchMap(empty => empty ? EMPTY : merge(acc, shared))
)
}, from([]))
)
或者
from([1, 2, 3, 4]).pipe(
windowCount(2),
map(w => w.pipe(shareReplay())),
concatMap(w => w.pipe(isEmpty(), filter(e => !e), mapTo(w))),
scan((acc, curr) => merge(acc, curr), from([]))
)
我想在数据输入时显示分析的部分结果。为每个新值重新计算效率非常低(与 'scan' 一样)。但是,在这种情况下,我可以对数据块进行分析并将这些结果结合起来。所以我一直在使用 'window' 来分解数据,然后 'scan' 来组合每个 window 计算的结果。结果本身就是一个可观察对象,因此将其作为嵌套可观察对象发出是很自然的。此外,该过程的下一步在使用可观察对象时效果非常好。
但是,我无法按预期让它工作。 (我确实通过一个笨拙的步骤将内部可观察对象转换为数组,然后再转换回可观察对象。)似乎有些事情我不明白 "window" and/or "scan" .
这里有两个示例,它们在生成嵌套可观察对象的方式上有所不同。我本以为以下两个示例会给出相同的结果,但事实并非如此。
首先,我直接创建嵌套的可观察对象。第二,我使用 window 操作创建它。然后,在这两种情况下,我都对嵌套的可观察对象应用相同的扫描。
这符合我的预期:
rxjs.from([rxjs.from([1, 2]), rxjs.from([3, 4])])
.pipe(
ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
).subscribe(win => win.subscribe(
x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
e => console.log("outer error"), () => console.log("outer |"))
对于每个发出的可观察值,我看到前一个值的累积,然后是新值。
1 2 | 1 2 3 4 |
我预计下一个会产生相同的结果,但事实并非如此:
rxjs.from([1, 2, 3, 4])
.pipe(
ops.windowCount(2),
ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
).subscribe(win => win.subscribe(x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
e => console.log("outer error"), () => console.log("outer|"))
似乎有效地忽略了扫描操作并发出原始的windows,
1 2 | 3 4 |
我错过了什么?传统的解决方案是什么样的?谢谢!
windowCount
正在内部使用 Subject
。因此它创建并 returns 一个 Subject,然后将 1
和 2
发送给它作为第一个 window。在第一个 scan
迭代中,您在 1
和 2
发送和接收这些值之前订阅了该主题。对于以后的迭代,您在 1
和 2
已经发出后订阅,因此您不会再次收到这些值。
有点喜欢:
const { Subject, merge, from } = rxjs
const window1 = new Subject()
const scanResult1 = merge(from([]), window1)
scanResult1.subscribe(console.log)
window1.next(1)
window1.next(2)
console.log('|')
const window2 = new Subject()
const scanResult2 = merge(scanResult1, window2)
scanResult2.subscribe(console.log)
window2.next(3)
window2.next(4)
<script src="https://unpkg.com/@reactivex/rxjs@6.5.5/dist/global/rxjs.umd.js"></script>
使用 bufferCount
您只需将 windowCount
替换为 bufferCount
即可将数组发送到 scan
而不是主题。 scan
中的代码可以与 merge
中的代码保持一致,也可以处理数组,但是如果要保证值在他们进来的顺序相同。
rxjs.from([1, 2, 3, 4])
.pipe(
ops.bufferCount(2),
ops.scan((acc, curr) => rxjs.concat(acc, curr), rxjs.from([]))
).subscribe(
win => win.subscribe(
x => console.log(JSON.stringify(x)),
e => console.log("error"), () => console.log("|")
),
e => console.log("outer error"),
() => console.log("outer|")
)
使用 window计数
您可以将 shareReplay
添加到您的 windows,以向未来的订阅者重播它们的价值。如果源的计数可以被 windowSize 整除,windowCount
在末尾发出一个空的 window,只有当当前 window 不为空。否则你会得到两次最终结果。
from([1, 2, 3, 4]).pipe(
windowCount(2),
scan((acc, curr) => {
const shared = curr.pipe(shareReplay())
return shared.pipe(
isEmpty(),
switchMap(empty => empty ? EMPTY : merge(acc, shared))
)
}, from([]))
)
或者
from([1, 2, 3, 4]).pipe(
windowCount(2),
map(w => w.pipe(shareReplay())),
concatMap(w => w.pipe(isEmpty(), filter(e => !e), mapTo(w))),
scan((acc, curr) => merge(acc, curr), from([]))
)