使用背压处理来自 rxjs 可观察订阅的数据的正确方法
correct way to process data from an rxjs oberservable subscription with back-pressure
我有一个 rxjs.observable
(rxjs 版本 6.2.1),returns urls
我需要向其发出 GET
请求。
var subscription = urlObservable$.subscribe(
function (url) {
console.log('new URL: ' + url);
processURL(url)
},
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }
);
对于每个 url
,我需要通过函数 processURL(url)
发出请求。根据反应哲学,处理所有这些传入的 url 并一个一个地发出请求而不是在 subscribe
发出数据后立即触发所有请求的正确方法是什么?请注意,在这种情况下,可观察 urlObservable$
将 return 数据比 returned url
需要发出的请求快得多。
processURL
可以return一个promise
。
谢谢。
如果 urlObservable$
只发出字符串,你可以简单地使用 concatMap
,它总是等到前一个 Observable 完成:
urlObservable$
.pipe(
concatMap(url => processURL(url)),
)
.subscribe(...);
即使 processURL
returns Promise 也能正常工作。
我有一个 rxjs.observable
(rxjs 版本 6.2.1),returns urls
我需要向其发出 GET
请求。
var subscription = urlObservable$.subscribe(
function (url) {
console.log('new URL: ' + url);
processURL(url)
},
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }
);
对于每个 url
,我需要通过函数 processURL(url)
发出请求。根据反应哲学,处理所有这些传入的 url 并一个一个地发出请求而不是在 subscribe
发出数据后立即触发所有请求的正确方法是什么?请注意,在这种情况下,可观察 urlObservable$
将 return 数据比 returned url
需要发出的请求快得多。
processURL
可以return一个promise
。
谢谢。
如果 urlObservable$
只发出字符串,你可以简单地使用 concatMap
,它总是等到前一个 Observable 完成:
urlObservable$
.pipe(
concatMap(url => processURL(url)),
)
.subscribe(...);
即使 processURL
returns Promise 也能正常工作。