使用 RxJS 如何缓冲函数调用,直到另一个异步函数调用已解决
Using RxJS how to buffer function calls until an other async function call has resolved
如何使用 RxJS 缓冲函数调用,直到另一个异步函数已解析?
这是我想要完成的一个简单示例
function asyncFunc(time) {
setTimeout(() => {
console.log('asyncFunc has resolved');
}, time);
}
function funcToBuffer(time) {
setTimeout(() => {
console.log(time);
}, time);
}
asyncFunc(3000);
funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);
asyncFunc(8000);
funcToBuffer(6000);
funcToBuffer(7000);
此时此代码将打印:
1000
2000
asyncFunc has resolved
4000
5000
6000
7000
asyncFunc has resolved
我要打印的是:
asyncFunc has resolved
1000
2000
4000
5000
asyncFunc has resolved
6000
7000
本质上,我想要某种控制流,允许我随时调用 funcToBuffer,但在幕后,我希望它在 asyncFunc 正在执行并等待解析时继续执行。一旦 asyncFunc 已解决,funcToBuffer 调用不应再被缓冲并立即执行。
我试过使用缓冲区运算符,但无法达到预期的结果。
CombineLatest 等待两个 observables 触发。
const { of, combineLatest } = rxjs;
const { delay } = rxjs.operators;
let obs1$ = of(1).pipe(delay(1000));
let obs2$ = of(2).pipe(delay(2000));
let now = new Date();
combineLatest(obs1$, obs2$).subscribe(([obs1, obs2]) => {
let ellapsed = new Date().getTime() - now.getTime();
console.log(`${obs1} - ${obs2} took ${ellapsed}`);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js"></script>
我开始使用 combineLatest 来研究解决方案,但我认为 BehaviorSubject 会是一个更好的解决方案,一旦我对它进行了更多思考。
const { BehaviorSubject } = rxjs;
const { filter } = rxjs.operators;
let finalised$ = new BehaviorSubject(false);
function asyncFunc(time) {
setTimeout(() => {
console.log('asyncFunc has resolved');
if (!finalised$.getValue()) {
finalised$.next(true);
}
}, time);
}
function funcToBuffer(time) {
finalised$.pipe(filter(finalised => finalised)).subscribe(_ => { // Filter so only fire finalised being true
setTimeout(() => {
console.log(time);
}, time);
});
}
asyncFunc(3000);
funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);
asyncFunc(8000);
funcToBuffer(6000);
funcToBuffer(7000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js"></script>
如果我没理解错的话,你的主要目标是通过一种机制来控制一系列函数的执行,这种机制可以缓冲它们直到发生某些事情,而这正是触发缓冲函数执行的原因。
如果这是正确的,以下可能是您问题的可能解决方案的基础
const functions$ = new Subject<() => any>();
const buffer$ = new Subject<any>();
const executeBuffer$ = new Subject<any>();
const setBuffer = (executionDelay: number) => {
buffer$.next();
setTimeout(() => {
executeBuffer$.next();
}, executionDelay);
}
const functionBuffer$ = functions$
.pipe(
bufferWhen(() => buffer$),
);
zip(functionBuffer$, executeBuffer$)
.pipe(
tap(functionsAndExecuteSignal => functionsAndExecuteSignal[0].forEach(f => f()))
)
.subscribe();
让我解释一下代码。
首先,我们构建 functions$
,即我们要控制的函数的 Observable。 Observable 是使用 Subject 构建的,因为我们希望能够以编程方式控制此类 Observable 的通知。换句话说,我们创建函数(作为对象)并要求 functions$
Observable 像这样
发出函数,而不是启动函数的执行 funcToBuffer(1000)
const aFunction = () => setTimeout(() => {console.log('I am a function that completes in 1 second');}, 1000);
functions$.next(aFunction);
通过这种方式,我们创建了最终将要执行的函数流。
第二件事,我们再创建 2 个 Observables,buffer$
和 executeBuffer$
,再次使用 Subjects。此类 Observables 用于发出信号,指示何时我们必须从 functions$
发出的函数中创建缓冲区,以及何时必须开始执行缓冲的函数。
最后 2 个 Observables 在函数 setBuffer
中使用。当你调用 setBuffer
时,你基本上是在说:请创建一个缓冲区,其中包含 functions$
迄今为止发出的所有函数,并在指定为参数的 executionDelay
时间后开始执行它们。
缓冲部分由使用 bufferWhen
运算符创建的 functionBuffer$
Observable 执行。执行部分是利用 zip
运算符实现的,它允许我们根据 executeBuffer$
Observable 的发射设置函数的执行节奏。
您可以测试上面的代码设置以下测试数据。
let f: () => any;
setBuffer(3000);
f = () => setTimeout(() => {console.log('f1');}, 1000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f2');}, 2000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f4');}, 4000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f5');}, 5000);
functions$.next(f);
setBuffer(8000);
f = () => setTimeout(() => {console.log('f6');}, 6000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f7');}, 7000);
functions$.next(f);
setBuffer(16000);
如何使用 RxJS 缓冲函数调用,直到另一个异步函数已解析?
这是我想要完成的一个简单示例
function asyncFunc(time) {
setTimeout(() => {
console.log('asyncFunc has resolved');
}, time);
}
function funcToBuffer(time) {
setTimeout(() => {
console.log(time);
}, time);
}
asyncFunc(3000);
funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);
asyncFunc(8000);
funcToBuffer(6000);
funcToBuffer(7000);
此时此代码将打印:
1000
2000
asyncFunc has resolved
4000
5000
6000
7000
asyncFunc has resolved
我要打印的是:
asyncFunc has resolved
1000
2000
4000
5000
asyncFunc has resolved
6000
7000
本质上,我想要某种控制流,允许我随时调用 funcToBuffer,但在幕后,我希望它在 asyncFunc 正在执行并等待解析时继续执行。一旦 asyncFunc 已解决,funcToBuffer 调用不应再被缓冲并立即执行。
我试过使用缓冲区运算符,但无法达到预期的结果。
CombineLatest 等待两个 observables 触发。
const { of, combineLatest } = rxjs;
const { delay } = rxjs.operators;
let obs1$ = of(1).pipe(delay(1000));
let obs2$ = of(2).pipe(delay(2000));
let now = new Date();
combineLatest(obs1$, obs2$).subscribe(([obs1, obs2]) => {
let ellapsed = new Date().getTime() - now.getTime();
console.log(`${obs1} - ${obs2} took ${ellapsed}`);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js"></script>
我开始使用 combineLatest 来研究解决方案,但我认为 BehaviorSubject 会是一个更好的解决方案,一旦我对它进行了更多思考。
const { BehaviorSubject } = rxjs;
const { filter } = rxjs.operators;
let finalised$ = new BehaviorSubject(false);
function asyncFunc(time) {
setTimeout(() => {
console.log('asyncFunc has resolved');
if (!finalised$.getValue()) {
finalised$.next(true);
}
}, time);
}
function funcToBuffer(time) {
finalised$.pipe(filter(finalised => finalised)).subscribe(_ => { // Filter so only fire finalised being true
setTimeout(() => {
console.log(time);
}, time);
});
}
asyncFunc(3000);
funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);
asyncFunc(8000);
funcToBuffer(6000);
funcToBuffer(7000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js"></script>
如果我没理解错的话,你的主要目标是通过一种机制来控制一系列函数的执行,这种机制可以缓冲它们直到发生某些事情,而这正是触发缓冲函数执行的原因。
如果这是正确的,以下可能是您问题的可能解决方案的基础
const functions$ = new Subject<() => any>();
const buffer$ = new Subject<any>();
const executeBuffer$ = new Subject<any>();
const setBuffer = (executionDelay: number) => {
buffer$.next();
setTimeout(() => {
executeBuffer$.next();
}, executionDelay);
}
const functionBuffer$ = functions$
.pipe(
bufferWhen(() => buffer$),
);
zip(functionBuffer$, executeBuffer$)
.pipe(
tap(functionsAndExecuteSignal => functionsAndExecuteSignal[0].forEach(f => f()))
)
.subscribe();
让我解释一下代码。
首先,我们构建 functions$
,即我们要控制的函数的 Observable。 Observable 是使用 Subject 构建的,因为我们希望能够以编程方式控制此类 Observable 的通知。换句话说,我们创建函数(作为对象)并要求 functions$
Observable 像这样
funcToBuffer(1000)
const aFunction = () => setTimeout(() => {console.log('I am a function that completes in 1 second');}, 1000);
functions$.next(aFunction);
通过这种方式,我们创建了最终将要执行的函数流。
第二件事,我们再创建 2 个 Observables,buffer$
和 executeBuffer$
,再次使用 Subjects。此类 Observables 用于发出信号,指示何时我们必须从 functions$
发出的函数中创建缓冲区,以及何时必须开始执行缓冲的函数。
最后 2 个 Observables 在函数 setBuffer
中使用。当你调用 setBuffer
时,你基本上是在说:请创建一个缓冲区,其中包含 functions$
迄今为止发出的所有函数,并在指定为参数的 executionDelay
时间后开始执行它们。
缓冲部分由使用 bufferWhen
运算符创建的 functionBuffer$
Observable 执行。执行部分是利用 zip
运算符实现的,它允许我们根据 executeBuffer$
Observable 的发射设置函数的执行节奏。
您可以测试上面的代码设置以下测试数据。
let f: () => any;
setBuffer(3000);
f = () => setTimeout(() => {console.log('f1');}, 1000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f2');}, 2000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f4');}, 4000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f5');}, 5000);
functions$.next(f);
setBuffer(8000);
f = () => setTimeout(() => {console.log('f6');}, 6000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f7');}, 7000);
functions$.next(f);
setBuffer(16000);