RxJS 和异步 - 了解执行流程
RxJS and async - Understand the execution flow
我很想用RxJS理解async;我写这个例子是为了看看异步函数调用在 RxJS observables 中是如何工作的;结果是:
func1 --- 10
func1 --- 20
func1 --- 40
func1 --- 30
switchMap ---1--- -->item: 0
switchMap ---1--- -->item: 1
switchMap ---1--- -->item: 2
switchMap ---1--- -->item: 3
switchMap ---1--- -->item: 4
switchMap ---1--- -->item: 5
switchMap ---1--- -->item: 6
switchMap ---1--- -->item: 7
END
switchMap ---2--- -->item: 1
switchMap ---3--- -->item: 0.111
switchMap ---3--- -->item: 1.111
switchMap ---3--- -->item: 2.111
switchMap ---3--- -->item: 3.111
switchMap ---3--- -->item: 4.111
switchMap ---3--- -->item: 5.111
switchMap ---3--- -->项目:6.111
switchMap ---3--- -->item: 7.111
switchMap ---3--- -->item: 8.111
switchMap ---3--- -->item: 9.111
MERGEMAP:项目-->1000
MERGEMAP:项目-->2000
N E X T --> 1000
N E X T --> 2000
MERGEMAP:项目-->3000
MERGEMAP:项目-->4000
N E X T --> 3000
N E X T --> 4000
MERGEMAP:项目-->5000
N E X T --> 5000
完成
我不明白为什么 func1 --- 40 出现在 func1 --- 30 之前,为什么 END 打印在 switchMap 的中间 -- -1--- 和 switchMap ---2---
谢谢
import { bufferCount, from, mergeMap, Observable, switchMap } from "rxjs";
module async1 {
async function func1() {
console.log("func1 --- 10");
const loop0 = async () => {
return from([0, 1, 2, 3, 4, 5, 6, 7]);
};
const loop1 = async () => {
return 1;
};
const loop2 = async () => {
const arrayLoop2 = [];
for (let i = 0; i < 10; i++) {
arrayLoop2.push(i + 0.111);
}
return arrayLoop2;
};
const loop3 = async () => {
const arrayLoop3 = [1000, 2000, 3000, 4000, 5000];
return from(arrayLoop3);
};
let myObservable: Observable<number>;
console.log("func1 --- 20");
loop0().then((value) => {
myObservable = value;
const myPipeline = myObservable!.pipe(
switchMap((item) => {
console.log(`switchMap ---1--- -->item: ${item}`);
const loop1Result = from(loop1());
return loop1Result;
}),
switchMap((item) => {
console.log(`switchMap ---2--- -->item: ${item}`);
const loop2Result = loop2();
return loop2Result;
}),
switchMap((items) => {
items.forEach((item) => {
console.log(`switchMap ---3--- -->item: ${item}`);
});
const loop3Result = loop3();
return loop3Result;
}),
switchMap((item) => {
return item;
}),
bufferCount(2),
mergeMap((items) => {
items.forEach((item) => {
console.log(`MERGEMAP: item-->${item}`);
});
return items;
}, 3)
);
console.log("func1 --- 30");
const mySubscription = myPipeline.subscribe({
next: (item) => {
console.log(`N E X T --> ${item}`);
},
error: () => {
console.log(`E R R O R`);
},
complete: () => {
console.log(`C O M P L E T E`);
mySubscription.unsubscribe();
},
});
});
console.log("func1 --- 40");
return true;
}
func1().then((resolve) => {
console.log("***END***");
});
}
您无法控制何时执行与可观察对象相关的代码,因此如果您想确保 func1()
中的整个逻辑在调用下一个 function
之前完成,您不应在 func1()
内使用可观察对象和订阅。您应该将 Promise<Type>
与 await
结合使用。然后你可以这样调用 func1
:
await func1();
如果你想在 RxJS 工作后执行函数 console.log("func1 --- 40")
和 console.log("***END***")
,你应该将它们添加到 complete
的主体中:
complete: () => {
console.log("func1 --- 40")
console.log(`C O M P L E T E`);
console.log("***END***")
mySubscription.unsubscribe();
}
我很想用RxJS理解async;我写这个例子是为了看看异步函数调用在 RxJS observables 中是如何工作的;结果是:
func1 --- 10
func1 --- 20
func1 --- 40
func1 --- 30
switchMap ---1--- -->item: 0
switchMap ---1--- -->item: 1
switchMap ---1--- -->item: 2
switchMap ---1--- -->item: 3
switchMap ---1--- -->item: 4
switchMap ---1--- -->item: 5
switchMap ---1--- -->item: 6
switchMap ---1--- -->item: 7
END
switchMap ---2--- -->item: 1
switchMap ---3--- -->item: 0.111
switchMap ---3--- -->item: 1.111
switchMap ---3--- -->item: 2.111
switchMap ---3--- -->item: 3.111
switchMap ---3--- -->item: 4.111
switchMap ---3--- -->item: 5.111
switchMap ---3--- -->项目:6.111
switchMap ---3--- -->item: 7.111
switchMap ---3--- -->item: 8.111
switchMap ---3--- -->item: 9.111
MERGEMAP:项目-->1000
MERGEMAP:项目-->2000
N E X T --> 1000
N E X T --> 2000
MERGEMAP:项目-->3000
MERGEMAP:项目-->4000
N E X T --> 3000
N E X T --> 4000
MERGEMAP:项目-->5000
N E X T --> 5000
完成
我不明白为什么 func1 --- 40 出现在 func1 --- 30 之前,为什么 END 打印在 switchMap 的中间 -- -1--- 和 switchMap ---2---
谢谢
import { bufferCount, from, mergeMap, Observable, switchMap } from "rxjs";
module async1 {
async function func1() {
console.log("func1 --- 10");
const loop0 = async () => {
return from([0, 1, 2, 3, 4, 5, 6, 7]);
};
const loop1 = async () => {
return 1;
};
const loop2 = async () => {
const arrayLoop2 = [];
for (let i = 0; i < 10; i++) {
arrayLoop2.push(i + 0.111);
}
return arrayLoop2;
};
const loop3 = async () => {
const arrayLoop3 = [1000, 2000, 3000, 4000, 5000];
return from(arrayLoop3);
};
let myObservable: Observable<number>;
console.log("func1 --- 20");
loop0().then((value) => {
myObservable = value;
const myPipeline = myObservable!.pipe(
switchMap((item) => {
console.log(`switchMap ---1--- -->item: ${item}`);
const loop1Result = from(loop1());
return loop1Result;
}),
switchMap((item) => {
console.log(`switchMap ---2--- -->item: ${item}`);
const loop2Result = loop2();
return loop2Result;
}),
switchMap((items) => {
items.forEach((item) => {
console.log(`switchMap ---3--- -->item: ${item}`);
});
const loop3Result = loop3();
return loop3Result;
}),
switchMap((item) => {
return item;
}),
bufferCount(2),
mergeMap((items) => {
items.forEach((item) => {
console.log(`MERGEMAP: item-->${item}`);
});
return items;
}, 3)
);
console.log("func1 --- 30");
const mySubscription = myPipeline.subscribe({
next: (item) => {
console.log(`N E X T --> ${item}`);
},
error: () => {
console.log(`E R R O R`);
},
complete: () => {
console.log(`C O M P L E T E`);
mySubscription.unsubscribe();
},
});
});
console.log("func1 --- 40");
return true;
}
func1().then((resolve) => {
console.log("***END***");
});
}
您无法控制何时执行与可观察对象相关的代码,因此如果您想确保 func1()
中的整个逻辑在调用下一个 function
之前完成,您不应在 func1()
内使用可观察对象和订阅。您应该将 Promise<Type>
与 await
结合使用。然后你可以这样调用 func1
:
await func1();
如果你想在 RxJS 工作后执行函数 console.log("func1 --- 40")
和 console.log("***END***")
,你应该将它们添加到 complete
的主体中:
complete: () => {
console.log("func1 --- 40")
console.log(`C O M P L E T E`);
console.log("***END***")
mySubscription.unsubscribe();
}