Rx.Observable.ForkJoin 在 RxJS 和并行异步中(使用 X-Ray)
Rx.Observable.ForkJoin in RxJS and parallel async (using X-Ray)
我正在尝试找出如何 运行 并行(在本例中为 10)基于使用 lapwinglabs/x-ray webscraper 的网站解析数据流的异步函数。
let pauser = new Rx.Subject()
let count = 0
let max = 10
// function that parse a single url to retrieve data
// return Observable
let parsing_each_link = url => {
return Rx.Observable.create(
observer => {
xray(url, selector)((err, data) => {
if (err) observer.onError(err)
observer.onNext(data)
observer.onCompleted()
})
})
}
// retrieve all the urls from a main page => node stream
let streamNode = xray(main_url, selector)
.paginate(some_selector)
.write()
.pipe(JSONStream.parse('*'))
// convert node stream to RxJS
let streamRx = RxNode.fromStream(streamNode)
.do(() => {
if (count === max) {
pauser.onNext(true)
count = 0
}
})
.do(() => count++)
.buffer(pauser) // take only 10 url by 10 url
streamRx.subscribe(
ten_urls => {
Rx.Observable.forkJoin(
ten_urls.map(url => parsing_each_link(url))
)
.subscribe(
x => console.log("Next : ", JSON.stringify(x, null, 4))
)
}
)
Next on the last console.log 永远不会调用 ?!?
无法确定,但如果您可以确保 ten_urls
按预期发出,那么下一步就是确保可观察 parsing_each_link
确实完成,因为 forkJoin
将等待其每个源可观察量的最后一个值。
我在您的代码中看不到任何对 observer.onComplete
的调用。
我正在尝试找出如何 运行 并行(在本例中为 10)基于使用 lapwinglabs/x-ray webscraper 的网站解析数据流的异步函数。
let pauser = new Rx.Subject()
let count = 0
let max = 10
// function that parse a single url to retrieve data
// return Observable
let parsing_each_link = url => {
return Rx.Observable.create(
observer => {
xray(url, selector)((err, data) => {
if (err) observer.onError(err)
observer.onNext(data)
observer.onCompleted()
})
})
}
// retrieve all the urls from a main page => node stream
let streamNode = xray(main_url, selector)
.paginate(some_selector)
.write()
.pipe(JSONStream.parse('*'))
// convert node stream to RxJS
let streamRx = RxNode.fromStream(streamNode)
.do(() => {
if (count === max) {
pauser.onNext(true)
count = 0
}
})
.do(() => count++)
.buffer(pauser) // take only 10 url by 10 url
streamRx.subscribe(
ten_urls => {
Rx.Observable.forkJoin(
ten_urls.map(url => parsing_each_link(url))
)
.subscribe(
x => console.log("Next : ", JSON.stringify(x, null, 4))
)
}
)
Next on the last console.log 永远不会调用 ?!?
无法确定,但如果您可以确保 ten_urls
按预期发出,那么下一步就是确保可观察 parsing_each_link
确实完成,因为 forkJoin
将等待其每个源可观察量的最后一个值。
我在您的代码中看不到任何对 observer.onComplete
的调用。