为什么在此示例中在 combineLatest 和 merge 之后再次触发初始流?
Why is initial stream being triggered again after combineLatest and merge in this example?
请看下面的摘录:
let requestStream = Rx.Observable
.of(`${GITHUB_API}?since=${randomNumber()}`)
.mergeMap(url => {
console.log(`performing request to: ${url}`)
return Rx.Observable.from(jQuery.getJSON(url))
});
let refreshStream = Rx.Observable.fromEvent(refreshButton, 'click')
.startWith('click')
.do(_ => users.empty())
.combineLatest(requestStream, (_, users) => users.slice(randomNumber(users.length)));
let randomUserStream = userRemovedStream
.combineLatest(requestStream, (_, users) => users[randomNumber(users.length)]);
requestStream
.merge(refreshStream)
.flatMap(users => users)
.merge(randomUserStream)
.filter(_ => users.children().length < MAX_SUGGESTIONS)
.do(user => users.append(createItem(user)))
.mergeMap(user => Rx.Observable.fromEvent($(`#close-${user.login}`), 'click'))
.map(event => event.target.parentNode)
.subscribe(user => {
user.remove();
userRemovedStream.next('');
});
requestStream
returns 一个有 100 个用户的数组,但是,我当时只消耗了其中的三个 (MAX_SUGGESTIONS
)。 refreshStream
和 randomUserStream
的存在是为了重用 requestStream
中的其他 97 个用户。问题是,当我 运行 上面的代码时,我仍然在控制台上看到 performing request to: ...
三次。
我注意到在最后一个流中添加 merge
方法后会发生这种情况,但是,我不确定为什么会发生这种情况。
我的理解是:当我 merge
refreshStream
和 randomUserStream
时,每当发出新项目时,单击前者的 refresh
按钮并单击在 remove
按钮之后,先前在 requestStream
上发出的数组将被解析并向前传递,而不是点击本身。这不应重新触发 requestStream
.
有人可以帮助我理解为什么会发生这种情况以及如何处理这种情况吗? - 这样我就可以最大限度地利用 API 在第一次调用时已经返回的用户?
发生这种情况是因为您有效地订阅了三个 requestStream
。然而,您对这三者如何交互的直觉是正确的,因为您的 requestStream
Observable 是冷的,每次有订阅时它都会创建一个新流。
它不一定很明显,因为只有一个订阅是明确的,但每次你将 requestStream
传递给 combineLatest
它最终会创建一个新的订阅,而这个订阅又会开始一个新的订阅流,在这种情况下调用您的底层 API.
如果您不希望这种情况发生,我建议您使用像 publishLast
这样的多播运算符
所以requestStream
会变成:
let requestStream = Rx.Observable
.of(`${GITHUB_API}?since=${randomNumber()}`)
.mergeMap(url => {
console.log(`performing request to: ${url}`)
return Rx.Observable.from(jQuery.getJSON(url))
})
.publishLast();
在这种情况下,requestStream
现在实际上是一个 ConnectableObservable
,因此您还需要在某个时候启动它,通常您会等到所有订阅者都连接上。
/* Rest of you example */
.map(event => event.target.parentNode)
.subscribe(user => {
user.remove();
userRemovedStream.next('');
});
requestStream.connect();
请看下面的摘录:
let requestStream = Rx.Observable
.of(`${GITHUB_API}?since=${randomNumber()}`)
.mergeMap(url => {
console.log(`performing request to: ${url}`)
return Rx.Observable.from(jQuery.getJSON(url))
});
let refreshStream = Rx.Observable.fromEvent(refreshButton, 'click')
.startWith('click')
.do(_ => users.empty())
.combineLatest(requestStream, (_, users) => users.slice(randomNumber(users.length)));
let randomUserStream = userRemovedStream
.combineLatest(requestStream, (_, users) => users[randomNumber(users.length)]);
requestStream
.merge(refreshStream)
.flatMap(users => users)
.merge(randomUserStream)
.filter(_ => users.children().length < MAX_SUGGESTIONS)
.do(user => users.append(createItem(user)))
.mergeMap(user => Rx.Observable.fromEvent($(`#close-${user.login}`), 'click'))
.map(event => event.target.parentNode)
.subscribe(user => {
user.remove();
userRemovedStream.next('');
});
requestStream
returns 一个有 100 个用户的数组,但是,我当时只消耗了其中的三个 (MAX_SUGGESTIONS
)。 refreshStream
和 randomUserStream
的存在是为了重用 requestStream
中的其他 97 个用户。问题是,当我 运行 上面的代码时,我仍然在控制台上看到 performing request to: ...
三次。
我注意到在最后一个流中添加 merge
方法后会发生这种情况,但是,我不确定为什么会发生这种情况。
我的理解是:当我 merge
refreshStream
和 randomUserStream
时,每当发出新项目时,单击前者的 refresh
按钮并单击在 remove
按钮之后,先前在 requestStream
上发出的数组将被解析并向前传递,而不是点击本身。这不应重新触发 requestStream
.
有人可以帮助我理解为什么会发生这种情况以及如何处理这种情况吗? - 这样我就可以最大限度地利用 API 在第一次调用时已经返回的用户?
发生这种情况是因为您有效地订阅了三个 requestStream
。然而,您对这三者如何交互的直觉是正确的,因为您的 requestStream
Observable 是冷的,每次有订阅时它都会创建一个新流。
它不一定很明显,因为只有一个订阅是明确的,但每次你将 requestStream
传递给 combineLatest
它最终会创建一个新的订阅,而这个订阅又会开始一个新的订阅流,在这种情况下调用您的底层 API.
如果您不希望这种情况发生,我建议您使用像 publishLast
所以requestStream
会变成:
let requestStream = Rx.Observable
.of(`${GITHUB_API}?since=${randomNumber()}`)
.mergeMap(url => {
console.log(`performing request to: ${url}`)
return Rx.Observable.from(jQuery.getJSON(url))
})
.publishLast();
在这种情况下,requestStream
现在实际上是一个 ConnectableObservable
,因此您还需要在某个时候启动它,通常您会等到所有订阅者都连接上。
/* Rest of you example */
.map(event => event.target.parentNode)
.subscribe(user => {
user.remove();
userRemovedStream.next('');
});
requestStream.connect();