合并的 observable 不包含序列,即使 mergees 包含序列
Merged observable does not contain sequence even though mergees do
我有一个函数,它本质上接受一个 DOM 元素,寻找一些识别信息,并且(理论上)returns 一个包含上述识别信息的 Observable
withLatestFrom
编辑了一些其他东西。
我的问题是返回的 Observable
没有发出任何 whatchamacallits,即使 primaries$
和 highlights$
observables 它是由 do 制成的。
如果解释不当,我非常抱歉,我是 ReactiveX/RxJS 的新手,我正在尽力而为;如果您需要更多信息,请询问。
function randomFunction(element) {
// Create an observable sequence of text nodes from an array
const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))
// Get "highlights" by doing stuff with text nodes
const highlights$ = textNodes$
.map(doSomeStuff)
.zip(textNodes$, (res, node) => ({res, node}))
.filter(({res, node}) => res.length && node.isConnected)
// Get "primaries" by doing stuff with "highlights"
const primaries$ = highlights$
.map(x => x.res)
.flatMap(x => x.filter(y => y.matches && y.isPrimary))
.map(x => x.id)
.toSet()
// Create return observable from highlights and primaries
const ret$ = highlights$.withLatestFrom(primaries$)
// These work
primaries$.subscribe(x => { console.log("primaries:", x) })
highlights$.subscribe(x => { console.log("highlights:", x) })
// This gives me nothing
ret$.subscribe(x => { console.log("return:", x) })
return ret$
}
谢谢!
我观察到您两次使用了一些可观察对象,在这种情况下,您可能应该 share
那些通用源可观察对象。例如,highlights$
将被订阅两次,一次是为 ret
生成值,一次是为 primaries
.
生成值
除此之外,您可以使用 do
或 tap
运算符放置日志记录,这将在不影响流的情况下执行日志记录副作用。您确实需要一个 subscribe
来启动数据流。当您订阅 ret
时,它会订阅 highlight
和 primaries
并沿着订阅链上升。
最后的话,多个订阅在 Rxjs 中很棘手。您需要了解热与冷的二分法才能理解会发生什么。您可以找到内部工作的详细检查 。
所以像这样:
function randomFunction(element) {
// Create an observable sequence of text nodes from an array
const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))
// Get "highlights" by doing stuff with text nodes
const highlights$ = textNodes$
.map(doSomeStuff)
.zip(textNodes$, (res, node) => ({res, node}))
.filter(({res, node}) => res.length && node.isConnected)
.share()
.tap(console.log.bind(console, 'highlights:'))
// Get "primaries" by doing stuff with "highlights"
const primaries$ = highlights$
.map(x => x.res)
.flatMap(x => x.filter(y => y.matches && y.isPrimary))
.map(x => x.id)
.toSet()
.tap(console.log.bind(console, 'primaries:'))
// Create return observable from highlights and primaries
const ret$ = highlights$.withLatestFrom(primaries$)
// These work
ret$.subscribe(x => { console.log("return:", x) })
return ret$
}
根据 user3743222 在他们的回答中所说的内容,事实证明我在创建 ret$
时还需要 pause/buffer highlights$
,并等待 primaries$
发出一些东西继续。下面是我如何使用 Rx.Observable.prototype.pausableBuffered
function randomFunction(element) {
// Create an observable sequence of text nodes from an array
const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))
// Get "highlights" by doing stuff with text nodes
const highlights$ = textNodes$
.map(doSomeStuff)
.zip(textNodes$, (res, node) => ({res, node}))
.filter(({res, node}) => res.length && node.isConnected)
.share()
.tap(console.log.bind(console, 'highlights:'))
// Get "primaries" by doing stuff with "highlights"
const primaries$ = highlights$
.map(x => x.res)
.flatMap(x => x.filter(y => y.matches && y.isPrimary))
.map(x => x.id)
.toSet()
.tap(console.log.bind(console, 'primaries:'))
// Observable that outputs true when primaries$ outputs anything
const primariesExist$ = primaries$.map(() => true)
// Create return observable from highlights and primaries
return highlights$
.pausableBuffered(primariesExist$)
.withLatestFrom(primaries$)
}
我有一个函数,它本质上接受一个 DOM 元素,寻找一些识别信息,并且(理论上)returns 一个包含上述识别信息的 Observable
withLatestFrom
编辑了一些其他东西。
我的问题是返回的 Observable
没有发出任何 whatchamacallits,即使 primaries$
和 highlights$
observables 它是由 do 制成的。
如果解释不当,我非常抱歉,我是 ReactiveX/RxJS 的新手,我正在尽力而为;如果您需要更多信息,请询问。
function randomFunction(element) {
// Create an observable sequence of text nodes from an array
const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))
// Get "highlights" by doing stuff with text nodes
const highlights$ = textNodes$
.map(doSomeStuff)
.zip(textNodes$, (res, node) => ({res, node}))
.filter(({res, node}) => res.length && node.isConnected)
// Get "primaries" by doing stuff with "highlights"
const primaries$ = highlights$
.map(x => x.res)
.flatMap(x => x.filter(y => y.matches && y.isPrimary))
.map(x => x.id)
.toSet()
// Create return observable from highlights and primaries
const ret$ = highlights$.withLatestFrom(primaries$)
// These work
primaries$.subscribe(x => { console.log("primaries:", x) })
highlights$.subscribe(x => { console.log("highlights:", x) })
// This gives me nothing
ret$.subscribe(x => { console.log("return:", x) })
return ret$
}
谢谢!
我观察到您两次使用了一些可观察对象,在这种情况下,您可能应该 share
那些通用源可观察对象。例如,highlights$
将被订阅两次,一次是为 ret
生成值,一次是为 primaries
.
除此之外,您可以使用 do
或 tap
运算符放置日志记录,这将在不影响流的情况下执行日志记录副作用。您确实需要一个 subscribe
来启动数据流。当您订阅 ret
时,它会订阅 highlight
和 primaries
并沿着订阅链上升。
最后的话,多个订阅在 Rxjs 中很棘手。您需要了解热与冷的二分法才能理解会发生什么。您可以找到内部工作的详细检查
所以像这样:
function randomFunction(element) {
// Create an observable sequence of text nodes from an array
const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))
// Get "highlights" by doing stuff with text nodes
const highlights$ = textNodes$
.map(doSomeStuff)
.zip(textNodes$, (res, node) => ({res, node}))
.filter(({res, node}) => res.length && node.isConnected)
.share()
.tap(console.log.bind(console, 'highlights:'))
// Get "primaries" by doing stuff with "highlights"
const primaries$ = highlights$
.map(x => x.res)
.flatMap(x => x.filter(y => y.matches && y.isPrimary))
.map(x => x.id)
.toSet()
.tap(console.log.bind(console, 'primaries:'))
// Create return observable from highlights and primaries
const ret$ = highlights$.withLatestFrom(primaries$)
// These work
ret$.subscribe(x => { console.log("return:", x) })
return ret$
}
根据 user3743222 在他们的回答中所说的内容,事实证明我在创建 ret$
时还需要 pause/buffer highlights$
,并等待 primaries$
发出一些东西继续。下面是我如何使用 Rx.Observable.prototype.pausableBuffered
function randomFunction(element) {
// Create an observable sequence of text nodes from an array
const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))
// Get "highlights" by doing stuff with text nodes
const highlights$ = textNodes$
.map(doSomeStuff)
.zip(textNodes$, (res, node) => ({res, node}))
.filter(({res, node}) => res.length && node.isConnected)
.share()
.tap(console.log.bind(console, 'highlights:'))
// Get "primaries" by doing stuff with "highlights"
const primaries$ = highlights$
.map(x => x.res)
.flatMap(x => x.filter(y => y.matches && y.isPrimary))
.map(x => x.id)
.toSet()
.tap(console.log.bind(console, 'primaries:'))
// Observable that outputs true when primaries$ outputs anything
const primariesExist$ = primaries$.map(() => true)
// Create return observable from highlights and primaries
return highlights$
.pausableBuffered(primariesExist$)
.withLatestFrom(primaries$)
}