使用 RxJS 如何缓冲函数调用,直到另一个异步函数调用已解决

Using RxJS how to buffer function calls until an other async function call has resolved

如何使用 RxJS 缓冲函数调用,直到另一个异步函数已解析?

这是我想要完成的一个简单示例

function asyncFunc(time) {
  setTimeout(() => {
      console.log('asyncFunc has resolved');
   }, time);
}

function funcToBuffer(time) {
  setTimeout(() => {
    console.log(time);
  }, time);
}

asyncFunc(3000);

funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);

asyncFunc(8000);

funcToBuffer(6000);
funcToBuffer(7000);

此时此代码将打印:

1000
2000
asyncFunc has resolved
4000
5000
6000
7000
asyncFunc has resolved

我要打印的是:

asyncFunc has resolved
1000
2000    
4000
5000
asyncFunc has resolved
6000
7000

本质上,我想要某种控制流,允许我随时调用 funcToBuffer,但在幕后,我希望它在 asyncFunc 正在执行并等待解析时继续执行。一旦 asyncFunc 已解决,funcToBuffer 调用不应再被缓冲并立即执行。

我试过使用缓冲区运算符,但无法达到预期的结果。

CombineLatest 等待两个 observables 触发。

const { of, combineLatest } = rxjs;
const { delay } = rxjs.operators;


let obs1$ = of(1).pipe(delay(1000));
let obs2$ = of(2).pipe(delay(2000));

let now = new Date();
combineLatest(obs1$, obs2$).subscribe(([obs1, obs2]) => {
  let ellapsed = new Date().getTime() - now.getTime();
  console.log(`${obs1} - ${obs2} took ${ellapsed}`);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js"></script>

我开始使用 combineLatest 来研究解决方案,但我认为 BehaviorSubject 会是一个更好的解决方案,一旦我对它进行了更多思考。

const { BehaviorSubject } = rxjs;
const { filter } = rxjs.operators;

let finalised$ = new BehaviorSubject(false);

function asyncFunc(time) {
  setTimeout(() => {
      console.log('asyncFunc has resolved');
      if (!finalised$.getValue()) {
        finalised$.next(true);
      }
  }, time);
}

function funcToBuffer(time) {
  finalised$.pipe(filter(finalised => finalised)).subscribe(_ => { // Filter so only fire finalised being true
    setTimeout(() => {
      console.log(time);
    }, time);
  });
}

asyncFunc(3000);

funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);

asyncFunc(8000);

funcToBuffer(6000);
funcToBuffer(7000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js"></script>

如果我没理解错的话,你的主要目标是通过一种机制来控制一系列函数的执行,这种机制可以缓冲它们直到发生某些事情,而这正是触发缓冲函数执行的原因。

如果这是正确的,以下可能是您问题的可能解决方案的基础

const functions$ = new Subject<() => any>();

const buffer$ = new Subject<any>();
const executeBuffer$ = new Subject<any>();
const setBuffer = (executionDelay: number) => {
    buffer$.next();
    setTimeout(() => {
        executeBuffer$.next();
    }, executionDelay);
}

const functionBuffer$ = functions$
.pipe(
    bufferWhen(() => buffer$),
);

zip(functionBuffer$, executeBuffer$)
.pipe(
    tap(functionsAndExecuteSignal => functionsAndExecuteSignal[0].forEach(f => f()))
)
.subscribe();

让我解释一下代码。

首先,我们构建 functions$,即我们要控制的函数的 Observable。 Observable 是使用 Subject 构建的,因为我们希望能够以编程方式控制此类 Observable 的通知。换句话说,我们创建函数(作为对象)并要求 functions$ Observable 像这样

发出函数,而不是启动函数的执行 funcToBuffer(1000)
const aFunction = () => setTimeout(() => {console.log('I am a function that completes in 1 second');}, 1000);
functions$.next(aFunction);

通过这种方式,我们创建了最终将要执行的函数流。

第二件事,我们再创建 2 个 Observables,buffer$executeBuffer$,再次使用 Subjects。此类 Observables 用于发出信号,指示何时我们必须从 functions$ 发出的函数中创建缓冲区,以及何时必须开始执行缓冲的函数。

最后 2 个 Observables 在函数 setBuffer 中使用。当你调用 setBuffer 时,你基本上是在说:请创建一个缓冲区,其中包含 functions$ 迄今为止发出的所有函数,并在指定为参数的 executionDelay 时间后开始执行它们。

缓冲部分由使用 bufferWhen 运算符创建的 functionBuffer$ Observable 执行。执行部分是利用 zip 运算符实现的,它允许我们根据 executeBuffer$ Observable 的发射设置函数的执行节奏。

您可以测试上面的代码设置以下测试数据。

let f: () => any;
setBuffer(3000);
f = () => setTimeout(() => {console.log('f1');}, 1000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f2');}, 2000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f4');}, 4000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f5');}, 5000);
functions$.next(f);
setBuffer(8000);
f = () => setTimeout(() => {console.log('f6');}, 6000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f7');}, 7000);
functions$.next(f);
setBuffer(16000);