防止 RxJS 中的异步管道运算符过早完成

Preventing premature completion of an async pipeable operator in RxJS

我正在使用 RxJS 6 创建 pipeable operators,并且不清楚在操作 异步.[=25 时如何 complete() 观察者=]

对于同步操作,逻辑很简单。在下面的示例中,来自源 Observable 的所有值都将传递给 observer.next(),然后调用 observer.complete()

const syncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => observer.next(x),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

然而,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 setTimeout() 的调用表示。显然,observer.complete() 将在 任何值传递给 observer.next().

之前被调用

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => setTimeout(() => observer.next(x), 100),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

所以问题是:惯用的 RxJS 方法是什么才能使对 observer.complete() 的调用仅在所有值都异步传递给 observer.next() 之后才进行?我应该手动跟踪未决呼叫还是有更多 "reactive" 解决方案?

(请注意,上面的示例是我实际代码的简化,对 setTimeout() 的调用旨在表示 "any asynchronous operation"。我正在寻找一个在管道操作符中处理异步操作的一般方法,而不是关于如何处理 RxJS 中的延迟或超时的建议。)

仍然希望获得关于更多 reactive/idiomatic 实施的意见,但下面是我决定暂时采用的方法。

本质上,我只是为飞行中的操作使用了一个计数器 (pending) 并使操作员仅在源可观察对象完成时才完成 (completed) 并且没有待处理的操作 (!pending)。

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    let pending = 0; // the number of in-flight operations
    let completed = false; // whether or not the source observable completed
    
    return source.subscribe({
      next: (x) => {
        pending++;
        
        setTimeout(() => {              
          observer.next(x);
          
          if (!--pending && completed) { // no ops pending and source completed
            observer.complete();
          }
        }, 100);
      },
      error: (e) => observer.error(err),
      complete: () => {
        completed = true;
        
        if (!pending) { // no ops pending
          observer.complete();
        }
      }
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

我创建了这个 runnable StackBlitz demo 来展示我认为应该做的事情。

这里的想法是使用 toArray() 将源可观察到的所有值获取到一个数组中。 toArray()后面的代码是单值(数组)。

注意:有很多方法(运算符)可以解决问题,这只是基于我从这个问题中理解的一个例子——这对 RxJS Observables 来说既是好事也是坏事。希望这可以帮助。 :-)

主要演示代码为:

// --- for each value, do the async service
of(...[1, 2, 3]).pipe(
  // let each value be processed by both async service...
  concatMap(no => myAsyncService$(no)),
  concatMap(no => myAsyncService2$(no)),

  // --- toArray() combines all the values (i.e. they completed)
  toArray(),

  // --- this will only be called once - with all completed values
  // --- testing: try commenting the toArray() to see the values as individual "next" value
  tap(val => {
    // see the combined values
    console.log(val)
  })
).subscribe();

一种思路可能是重组您的 asyncOp 以使用其他运算符,例如 mergeMap.

这是使用此方法重现您的示例的代码

const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));

这是否值得考虑取决于您的 asyncOp 所做的事情。如果它是异步的,因为它依赖于某些回调,例如 https 调用或从文件系统读取的情况,那么我认为这种方法可以工作,因为您可以将基于回调的函数转换为 Observable。