合并的 observable 不包含序列,即使 mergees 包含序列

Merged observable does not contain sequence even though mergees do

我有一个函数,它本质上接受一个 DOM 元素,寻找一些识别信息,并且(理论上)returns 一个包含上述识别信息的 Observable withLatestFrom编辑了一些其他东西。

我的问题是返回的 Observable 没有发出任何 whatchamacallits,即使 primaries$highlights$ observables 它是由 do 制成的。

如果解释不当,我非常抱歉,我是 ReactiveX/RxJS 的新手,我正在尽力而为;如果您需要更多信息,请询问。

function randomFunction(element) {
  // Create an observable sequence of text nodes from an array
  const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))

  // Get "highlights" by doing stuff with text nodes
  const highlights$ = textNodes$
    .map(doSomeStuff)
    .zip(textNodes$, (res, node) => ({res, node}))
    .filter(({res, node}) => res.length && node.isConnected)

  // Get "primaries" by doing stuff with "highlights"
  const primaries$ = highlights$
    .map(x => x.res)
    .flatMap(x => x.filter(y => y.matches && y.isPrimary))
    .map(x => x.id)
    .toSet()

  // Create return observable from highlights and primaries
  const ret$ = highlights$.withLatestFrom(primaries$)

  // These work
  primaries$.subscribe(x => { console.log("primaries:", x) })
  highlights$.subscribe(x => { console.log("highlights:", x) })

  // This gives me nothing
  ret$.subscribe(x => { console.log("return:", x) })

  return ret$
}

谢谢!

我观察到您两次使用了一些可观察对象,在这种情况下,您可能应该 share 那些通用源可观察对象。例如,highlights$ 将被订阅两次,一次是为 ret 生成值,一次是为 primaries.

生成值

除此之外,您可以使用 dotap 运算符放置日志记录,这将在不影响流的情况下执行日志记录副作用。您确实需要一个 subscribe 来启动数据流。当您订阅 ret 时,它会订阅 highlightprimaries 并沿着订阅链上升。

最后的话,多个订阅在 Rxjs 中很棘手。您需要了解热与冷的二分法才能理解会发生什么。您可以找到内部工作的详细检查

所以像这样:

function randomFunction(element) {
  // Create an observable sequence of text nodes from an array
  const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))

  // Get "highlights" by doing stuff with text nodes
  const highlights$ = textNodes$
    .map(doSomeStuff)
    .zip(textNodes$, (res, node) => ({res, node}))
    .filter(({res, node}) => res.length && node.isConnected)
    .share()
    .tap(console.log.bind(console, 'highlights:'))

  // Get "primaries" by doing stuff with "highlights"
  const primaries$ = highlights$
    .map(x => x.res)
    .flatMap(x => x.filter(y => y.matches && y.isPrimary))
    .map(x => x.id)
    .toSet()
    .tap(console.log.bind(console, 'primaries:'))

  // Create return observable from highlights and primaries
  const ret$ = highlights$.withLatestFrom(primaries$)

  // These work
  ret$.subscribe(x => { console.log("return:", x) })

  return ret$
}

根据 user3743222 在他们的回答中所说的内容,事实证明我在创建 ret$ 时还需要 pause/buffer highlights$,并等待 primaries$ 发出一些东西继续。下面是我如何使用 Rx.Observable.prototype.pausableBuffered

function randomFunction(element) {
  // Create an observable sequence of text nodes from an array
  const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))

  // Get "highlights" by doing stuff with text nodes
  const highlights$ = textNodes$
    .map(doSomeStuff)
    .zip(textNodes$, (res, node) => ({res, node}))
    .filter(({res, node}) => res.length && node.isConnected)
    .share()
    .tap(console.log.bind(console, 'highlights:'))

  // Get "primaries" by doing stuff with "highlights"
  const primaries$ = highlights$
    .map(x => x.res)
    .flatMap(x => x.filter(y => y.matches && y.isPrimary))
    .map(x => x.id)
    .toSet()
    .tap(console.log.bind(console, 'primaries:'))

  // Observable that outputs true when primaries$ outputs anything
  const primariesExist$ = primaries$.map(() => true)

  // Create return observable from highlights and primaries
  return highlights$
    .pausableBuffered(primariesExist$)
    .withLatestFrom(primaries$)
}