为什么在此示例中在 combineLatest 和 merge 之后再次触发初始流?

Why is initial stream being triggered again after combineLatest and merge in this example?

请看下面的摘录:

  let requestStream = Rx.Observable
    .of(`${GITHUB_API}?since=${randomNumber()}`)
    .mergeMap(url => {
      console.log(`performing request to: ${url}`)
      return Rx.Observable.from(jQuery.getJSON(url))
    });

  let refreshStream = Rx.Observable.fromEvent(refreshButton, 'click')
    .startWith('click')
    .do(_ => users.empty())
    .combineLatest(requestStream, (_, users) => users.slice(randomNumber(users.length)));

  let randomUserStream = userRemovedStream
    .combineLatest(requestStream, (_, users) => users[randomNumber(users.length)]);

  requestStream
    .merge(refreshStream)
    .flatMap(users => users)
    .merge(randomUserStream)
    .filter(_ => users.children().length < MAX_SUGGESTIONS)
    .do(user => users.append(createItem(user)))
    .mergeMap(user => Rx.Observable.fromEvent($(`#close-${user.login}`), 'click'))
    .map(event => event.target.parentNode)
    .subscribe(user => {
      user.remove();
      userRemovedStream.next('');
    });

requestStream returns 一个有 100 个用户的数组,但是,我当时只消耗了其中的三个 (MAX_SUGGESTIONS)。 refreshStreamrandomUserStream 的存在是为了重用 requestStream 中的其他 97 个用户。问题是,当我 运行 上面的代码时,我仍然在控制台上看到 performing request to: ... 三次。

我注意到在最后一个流中添加 merge 方法后会发生这种情况,但是,我不确定为什么会发生这种情况。

我的理解是:当我 merge refreshStreamrandomUserStream 时,每当发出新项目时,单击前者的 refresh 按钮并单击在 remove 按钮之后,先前在 requestStream 上发出的数组将被解析并向前传递,而不是点击本身。这不应重新触发 requestStream.

有人可以帮助我理解为什么会发生这种情况以及如何处理这种情况吗? - 这样我就可以最大限度地利用 API 在第一次调用时已经返回的用户?

发生这种情况是因为您有效地订阅了三个 requestStream。然而,您对这三者如何交互的直觉是正确的,因为您的 requestStream Observable 是冷的,每次有订阅时它都会创建一个新流。

它不一定很明显,因为只有一个订阅是明确的,但每次你将 requestStream 传递给 combineLatest 它最终会创建一个新的订阅,而这个订阅又会开始一个新的订阅流,在这种情况下调用您的底层 API.

如果您不希望这种情况发生,我建议您使用像 publishLast

这样的多播运算符

所以requestStream会变成:

let requestStream = Rx.Observable
    .of(`${GITHUB_API}?since=${randomNumber()}`)
    .mergeMap(url => {
      console.log(`performing request to: ${url}`)
      return Rx.Observable.from(jQuery.getJSON(url))
    })
    .publishLast();

在这种情况下,requestStream 现在实际上是一个 ConnectableObservable,因此您还需要在某个时候启动它,通常您会等到所有订阅者都连接上。

/* Rest of you example */
.map(event => event.target.parentNode)
.subscribe(user => {
  user.remove();
  userRemovedStream.next('');
});

requestStream.connect();