如何链接可观察对象,例如使用 Promises 或 async/await?
How to chain observables like using Promises or async/await?
让我们想象一下我使用 async/await 运行良好的代码:
async function getMeSomething() : Promise<Something> {
// some async code
}
async function getMeStuff() : Promise<Stuff> {
// some async code
}
async function getMeSomethingWithStuff: Promise<SomethingWithStuff> {
const something: Something = await getMeSomething();
const stuff: Stuff = await getMeStuff();
return new SomethingWithStuff(something, stuff);
}
这段代码可以有效地用 promises 编写,就像任何使用 async/await 的代码一样(只是更长):
// this is strictly equivalent to previous example, including exception management
function getMeSomethingWithStuff: Promise<SomethingWithStuff> {
let something;
return getMeSomething().then((res) => {
something = res;
return getMeStuff();
}).then((stuff) => {
return new SomethingWithStuff(something, stuff);
});
}
当调用第三个函数时,它会调用第一个和第二个函数,合并结果并 return 我以第三个函数的调用者不知道哪些底层函数是叫:
const somethingWithStuff: SomethingWithStuff = getMeSomethingWithStuff();
// we have the result, we don't care how we got it, the logic is completely encapsulated
我们如何仅使用 rxjs 编写等效代码?
// the following two functions are already defined and we can't redefine them
function getMeSomething() : Observable<Something> {
// some async code
}
function getMeStuff() : Observable<Stuff> {
// some async code
}
// we want to define this function that should return an observable
function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
// what's the code ?
}
规则:
- 禁止使用 promises 作弊,async/await 或 observables 和 promises 之间的转换,它只能使用 rxjs
- 必须正确处理异常
看似微不足道的问题,仔细阅读了rxjs的文档和大量的教程,我自己也找不到答案。
您需要使用像 forkJoin
这样的函数来并行触发多个可观察对象,并需要使用像 switchMap
这样的高阶映射运算符来从一个可观察对象映射到另一个。
有关 forkJoin
函数和 switchMap
运算符的简要说明,请参见 。
选项 1:平行
// the following two functions are already defined and we can't redefine them
function getMeSomething() : Observable<Something> {
// some async code
}
function getMeStuff() : Observable<Stuff> {
// some async code
}
function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
return forkJoin({
something: getMeSomething(),
stuff: getMeStuff()
}).pipe(
switchMap((res: any) =>
new SomethingWithStuff(res.something, res.stuff)
)
);
}
选项 2:串行
使用多个高阶映射运算符
function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
return getMeSomething().pipe(
switchMap(something => getMeStuff().pipe(
map(stuff => ({ something, stuff })
),
switchMap((res: any) =>
new SomethingWithStuff(res.something, res.stuff)
)
);
}
更新:添加串行可观察触发器
让我们想象一下我使用 async/await 运行良好的代码:
async function getMeSomething() : Promise<Something> {
// some async code
}
async function getMeStuff() : Promise<Stuff> {
// some async code
}
async function getMeSomethingWithStuff: Promise<SomethingWithStuff> {
const something: Something = await getMeSomething();
const stuff: Stuff = await getMeStuff();
return new SomethingWithStuff(something, stuff);
}
这段代码可以有效地用 promises 编写,就像任何使用 async/await 的代码一样(只是更长):
// this is strictly equivalent to previous example, including exception management
function getMeSomethingWithStuff: Promise<SomethingWithStuff> {
let something;
return getMeSomething().then((res) => {
something = res;
return getMeStuff();
}).then((stuff) => {
return new SomethingWithStuff(something, stuff);
});
}
当调用第三个函数时,它会调用第一个和第二个函数,合并结果并 return 我以第三个函数的调用者不知道哪些底层函数是叫:
const somethingWithStuff: SomethingWithStuff = getMeSomethingWithStuff();
// we have the result, we don't care how we got it, the logic is completely encapsulated
我们如何仅使用 rxjs 编写等效代码?
// the following two functions are already defined and we can't redefine them
function getMeSomething() : Observable<Something> {
// some async code
}
function getMeStuff() : Observable<Stuff> {
// some async code
}
// we want to define this function that should return an observable
function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
// what's the code ?
}
规则:
- 禁止使用 promises 作弊,async/await 或 observables 和 promises 之间的转换,它只能使用 rxjs
- 必须正确处理异常
看似微不足道的问题,仔细阅读了rxjs的文档和大量的教程,我自己也找不到答案。
您需要使用像 forkJoin
这样的函数来并行触发多个可观察对象,并需要使用像 switchMap
这样的高阶映射运算符来从一个可观察对象映射到另一个。
有关 forkJoin
函数和 switchMap
运算符的简要说明,请参见
选项 1:平行
// the following two functions are already defined and we can't redefine them
function getMeSomething() : Observable<Something> {
// some async code
}
function getMeStuff() : Observable<Stuff> {
// some async code
}
function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
return forkJoin({
something: getMeSomething(),
stuff: getMeStuff()
}).pipe(
switchMap((res: any) =>
new SomethingWithStuff(res.something, res.stuff)
)
);
}
选项 2:串行
使用多个高阶映射运算符
function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
return getMeSomething().pipe(
switchMap(something => getMeStuff().pipe(
map(stuff => ({ something, stuff })
),
switchMap((res: any) =>
new SomethingWithStuff(res.something, res.stuff)
)
);
}
更新:添加串行可观察触发器