我如何迭代 return rxjs observables 的函数
How do I iterate over functions that return rxjs observables
我想迭代一系列异步函数,并在返回 false 时结束迭代。
我是 rxjs
的新手,无法使用下面的用例。我觉得我不理解一些基本的东西。有人可以指出给我吗?
function validateA(): Observable<any> {
// do stuff.
return of({ id: "A", result: true }); // hardcoding result for now
}
function validateB(): Observable<any> {
// do stuff
return of({ id: "B", result: true }); // hardcoding result for now
}
function validateC(): Observable<any> {
// do stuff
return of({ id: "C", result: false });// hardcoding result for now
}
from([validateA, validateB, validateC])
.pipe(
map(data => data()),
takeWhile(data => !!data.result)
)
.subscribe(data => console.log(`${data.id} passed!`));
https://stackblitz.com/edit/typescript-ub9c5r?file=index.ts&devtoolsheight=100
以下似乎可以解决问题并延迟调用函数:
https://stackblitz.com/edit/typescript-9ystxv?file=index.ts
import { from, Observable, of } from "rxjs";
import { concatAll, find, map } from "rxjs/operators";
function validateA() {
console.log('validateA');
return of({ id: "A", result: false });
}
function validateB() {
console.log('validateB');
return of({ id: "B", result: true });
}
function validateC() {
console.log('validateC');
return of({ id: "C", result: false });
}
from([validateA, validateB, validateC])
.pipe(
map(validate => validate()),
concatAll(),
find(data => data.result)
)
.subscribe(data => console.log(`${data.id} passed!`));
我想说你的核心逻辑是对的。缺少的是一些 rxJs 的特殊性。
解决方案可能是这样的。细微差别的解释在评论中。
// start from an array of functions and turn it into a stream using RxJs from function
from([validateA, validateB, validateC])
.pipe(
// now execute each function sequentially, one after the other, via concatMap
// operator. This operator calls each function and each function returns an Observable
// concatMap ensures that the functions are called sequentially and also that the returned Observable (because each function returns an Observable)
// is "flattened" in the result stream. In other words, you execute each function one at the time
// and return the value emitted by the Observable returned by that function
// until that Observable completes. Considering that you use the "of" function to
// create the Observable which is returned by each function, such Observable emits just one value and then completes.
concatMap(func => func()),
// now you have a stream of values notified by the Observables returned by the functions
// and you terminate as soon as a flase is received
takeWhile(data => !!data.result)
)
.subscribe(data => console.log(`${data.id} passed!`));
我想迭代一系列异步函数,并在返回 false 时结束迭代。
我是 rxjs
的新手,无法使用下面的用例。我觉得我不理解一些基本的东西。有人可以指出给我吗?
function validateA(): Observable<any> {
// do stuff.
return of({ id: "A", result: true }); // hardcoding result for now
}
function validateB(): Observable<any> {
// do stuff
return of({ id: "B", result: true }); // hardcoding result for now
}
function validateC(): Observable<any> {
// do stuff
return of({ id: "C", result: false });// hardcoding result for now
}
from([validateA, validateB, validateC])
.pipe(
map(data => data()),
takeWhile(data => !!data.result)
)
.subscribe(data => console.log(`${data.id} passed!`));
https://stackblitz.com/edit/typescript-ub9c5r?file=index.ts&devtoolsheight=100
以下似乎可以解决问题并延迟调用函数:
https://stackblitz.com/edit/typescript-9ystxv?file=index.ts
import { from, Observable, of } from "rxjs";
import { concatAll, find, map } from "rxjs/operators";
function validateA() {
console.log('validateA');
return of({ id: "A", result: false });
}
function validateB() {
console.log('validateB');
return of({ id: "B", result: true });
}
function validateC() {
console.log('validateC');
return of({ id: "C", result: false });
}
from([validateA, validateB, validateC])
.pipe(
map(validate => validate()),
concatAll(),
find(data => data.result)
)
.subscribe(data => console.log(`${data.id} passed!`));
我想说你的核心逻辑是对的。缺少的是一些 rxJs 的特殊性。
解决方案可能是这样的。细微差别的解释在评论中。
// start from an array of functions and turn it into a stream using RxJs from function
from([validateA, validateB, validateC])
.pipe(
// now execute each function sequentially, one after the other, via concatMap
// operator. This operator calls each function and each function returns an Observable
// concatMap ensures that the functions are called sequentially and also that the returned Observable (because each function returns an Observable)
// is "flattened" in the result stream. In other words, you execute each function one at the time
// and return the value emitted by the Observable returned by that function
// until that Observable completes. Considering that you use the "of" function to
// create the Observable which is returned by each function, such Observable emits just one value and then completes.
concatMap(func => func()),
// now you have a stream of values notified by the Observables returned by the functions
// and you terminate as soon as a flase is received
takeWhile(data => !!data.result)
)
.subscribe(data => console.log(`${data.id} passed!`));