rxjs 嵌套可观察对象 window 和扫描的意外行为

Unexpected behavior with rxjs nested observables, window, and scan

我想在数据输入时显示分析的部分结果。为每个新值重新计算效率非常低(与 'scan' 一样)。但是,在这种情况下,我可以对数据块进行分析并将这些结果结合起来。所以我一直在使用 'window' 来分解数据,然后 'scan' 来组合每个 window 计算的结果。结果本身就是一个可观察对象,因此将其作为嵌套可观察对象发出是很自然的。此外,该过程的下一步在使用可观察对象时效果非常好。

但是,我无法按预期让它工作。 (我确实通过一个笨拙的步骤将内部可观察对象转换为数组,然后再转换回可观察对象。)似乎有些事情我不明白 "window" and/or "scan" .

这里有两个示例,它们在生成嵌套可观察对象的方式上有所不同。我本以为以下两个示例会给出相同的结果,但事实并非如此。

首先,我直接创建嵌套的可观察对象。第二,我使用 window 操作创建它。然后,在这两种情况下,我都对嵌套的可观察对象应用相同的扫描。

这符合我的预期:

rxjs.from([rxjs.from([1, 2]), rxjs.from([3, 4])])
    .pipe(
        ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
    ).subscribe(win => win.subscribe(
        x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
        e => console.log("outer error"), () => console.log("outer |"))

对于每个发出的可观察值,我看到前一个值的累积,然后是新值。 1 2 | 1 2 3 4 |

我预计下一个会产生相同的结果,但事实并非如此:

rxjs.from([1, 2, 3, 4])
    .pipe(
        ops.windowCount(2),
        ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
    ).subscribe(win => win.subscribe(x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
        e => console.log("outer error"), () => console.log("outer|"))

似乎有效地忽略了扫描操作并发出原始的windows, 1 2 | 3 4 |

我错过了什么?传统的解决方案是什么样的?谢谢!

windowCount 正在内部使用 Subject。因此它创建并 returns 一个 Subject,然后将 12 发送给它作为第一个 window。在第一个 scan 迭代中,您在 12 发送和接收这些值之前订阅了该主题。对于以后的迭代,您在 12 已经发出后订阅,因此您不会再次收到这些值。

有点喜欢:

const { Subject, merge, from } = rxjs

const window1 = new Subject()
const scanResult1 = merge(from([]), window1)
scanResult1.subscribe(console.log)
window1.next(1)
window1.next(2)

console.log('|')

const window2 = new Subject()
const scanResult2 = merge(scanResult1, window2)
scanResult2.subscribe(console.log)
window2.next(3)
window2.next(4)
<script src="https://unpkg.com/@reactivex/rxjs@6.5.5/dist/global/rxjs.umd.js"></script>

使用 bufferCount

您只需将 windowCount 替换为 bufferCount 即可将数组发送到 scan 而不是主题。 scan 中的代码可以与 merge 中的代码保持一致,也可以处理数组,但是如果要保证值在他们进来的顺序相同。

rxjs.from([1, 2, 3, 4])
  .pipe(
    ops.bufferCount(2),
    ops.scan((acc, curr) => rxjs.concat(acc, curr), rxjs.from([]))
  ).subscribe(
    win => win.subscribe(
      x => console.log(JSON.stringify(x)), 
      e => console.log("error"), () => console.log("|")
    ),
    e => console.log("outer error"),
    () => console.log("outer|")
  )

使用 window计数

您可以将 shareReplay 添加到您的 windows,以向未来的订阅者重播它们的价值。如果源的计数可以被 windowSize 整除,windowCount 在末尾发出一个空的 window,只有当当前 window 不为空。否则你会得到两次最终结果。

from([1, 2, 3, 4]).pipe(
  windowCount(2),
  scan((acc, curr) => {
    const shared = curr.pipe(shareReplay())
    return shared.pipe(
      isEmpty(),
      switchMap(empty => empty ? EMPTY : merge(acc, shared))
    )
  }, from([]))
)

或者

from([1, 2, 3, 4]).pipe(
  windowCount(2),
  map(w => w.pipe(shareReplay())),
  concatMap(w => w.pipe(isEmpty(), filter(e => !e), mapTo(w))),
  scan((acc, curr) => merge(acc, curr), from([]))
)