等待完成然后发出一个值的 RxJS 运算符

RxJS operator that waits for completion and then emits one value

是否有 RxJS 运算符等待源完成然后发出给定值?如果有none,我怎么能自己提供呢?

这与 toArray() 的工作方式类似,后者也会等待源完成。我不想收集所有发出的值,而是想忽略它们并 return 一个不同的值。

这是一个等效的实现:

observable.pipe(
  ignoreElements(),
  endWith(myValue),
);

或者:

observable.pipe(
  toArray(),
  map(ignore => myValue)
)

在很多情况下我需要这个。我得出的结论是,通过 switchMap()mergeMap() 将 promise then 链转换为 observable 是危险的,因为内部 observable 可以在根本不发出任何值的情况下完成。最近我们遇到了这个问题:

return getEntitiesFromBackend().pipe(
  switchMap(entities => {
    return putEntitiesToObjectStore(entities);
  }),
  switchMap(() => {
    return storeSyncDate();
  })
);

在某些情况下,同步日期没有被存储,而且很难找出原因。最后,原因是 putEntities... 方法为其“put”操作发出了一个值。但是在那些情况下,entities 数组是空的,所以根本没有发出任何值。

这就是我真正想做的 - 翻译成承诺世界:

return getEntitiesFromBackend()
  .then(entities => {
    return putEntitiesToObjectStore(entities);
  })
  .then(() => {
    return storeSyncDate();
  })
);

我看到的大多数使用switchMap/mergeMap的代码都没有这个问题。因为大多数时候您处理的 HTTP 请求只发出一次然后完成。例如,参见 here。这让我习惯于使用 switchMap 将典型的 promise 模式转换为 RxJS 世界,而没有过多考虑它的实际工作和目的。 现在,我们使用 IndexedDB 的大多数方法 return observables 为每个 DB 操作发出一个值。 switchMap / mergeMap 会在这里做噩梦。

这就是为什么我要求这样的运算符并想知道为什么我找不到它,因为它在我们的应用程序中很常见。我可以通过使用上面的替代实现轻松解决这个问题,但不想一遍又一遍地重复这两个运算符:

return getEntitiesFromBackend().pipe(
  switchMap(entities => {
    return putEntitiesToObjectStore(entities);
  }),
  ignoreElements(),
  endWith(),
  switchMap(() => {
    return storeSyncDate();
  })
);

当然我可以使用 toArray() 并忽略下一个运算符中的参数。我不喜欢它,因为它会导致不必要的开销。

使用 finalize 运算符怎么样?

https://rxjs.dev/api/operators/finalize

我觉得你想实现这个目标:

source.pipe(
   last(), // emits only last value, when the source completes
   map(() => myValue), // or mapTo(myValue)
)

我建议进行两项更改。

  1. 使用 concatMap 而不是 switchMapmergeMapconcatMap 将确保来自 getEntitiesFromBackend() Observable 的每个发射都将一个接一个地顺序转发,而不是并行(mergeMap)或被取消(switchMap)。有关不同类型的高阶映射运算符的简要介绍,请参阅

  2. 而不是像 ignoreElements + map 这样的运算符组合,您可以使用 last 运算符和一个始终 returns false 具有默认值。这样,当源可观察对象完成时,将发出默认值。

return getEntitiesFromBackend().pipe(
  concatMap(entities => {
    return putEntitiesToObjectStore(entities).pipe(
      last(() => false, myValue),
    );
  }),
  concatMap((myValue) => {
    return storeSyncDate();
  })
);

这是我的尝试,但我不知道它是否正常工作(即正确处理错误等)

function emitOnComplete(value) {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      return source.subscribe({
        next() {},
        complete() {
          subscriber.next(value);
          subscriber.complete();
        }
      });
    });
  };
}

或者我想出了这个,但它未经测试:

function emitOnComplete(value) {
  return function <T>(source: Observable<T>): Observable<T> {
    return source.pipe(
      ignoreElements(),
      // concat() is used to make sure we emit the value AFTER source completed
      // The tick is: source will never emit any value, because we use ignoreElements() above.
      // So the user will only get `of(value)`, but after the source is completed
      s => concat(s, of(value)),
    );
  }
}

我想到了几个选项。

defaultIfEmpty

这与您描述的不完全一样,但它可能适合您的用例。

defaultIfEmpty: Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

return getEntitiesFromBackend().pipe(
  switchMap(entities => putEntitiesToObjectStore(entities)),
  defualtIfEmpty(null),
  switchMap(_ => storeSyncDate())
);

创建您自己的运算符

有一个静态版本的 pipe 可以在不应用任何给定流的情况下执行组合。

因此,例如:

const a = source.pipe(
  ignoreElements(),
  endWith(myValue)
);

const b = pipe(
  ignoreElements(),
  endWith(myValue)
);

这里,a 是可观察的。管道组合两个运算符,然后 returns 将组合运算符应用于 source.

的结果 另一方面,

b 只是第一步。 b 本身就是一个运算符。该运算符尚未应用于可观察对象。

所以你可以这样做:

source1.pipe(b);
source2.pipe(b);

我已经重复使用我的 b 运算符两次。我们已经完成大部分工作了!

RxJS 运算符非常有用,因为它们由根据您的需要自定义运算符的函数返回。在上述情况下,每次使用 b 时,endwith.

的值都相同

我们可以将 b 包装在一个函数中,以便在每次使用的基础上进行自定义。

const myCustomOperator = myValue => pipe(
  ignoreElements(),
  endWith(myValue)
);

source1.pipe(
  myCustomOperator(22)
);

source2.pipe(
  myCustomOperator(23)
);

这与任何其他运算符一样工作,因此它也可以与所有其他标准运算符组合(“管道化”)。

I could easily solve this by using the alternative implementations above, but don't want to repeat those two operators over and over again:

现在您有了一段可重复使用的代码!


我将如何实现:

JavaScript:

function ignoreThenConcat(genObs) {
  return pipe(
    ignoreElements(),
    concatWith(from(genObs()))
  );
}

[...]
  return getEntitiesFromBackend().pipe(
    switchMap(entities => putEntitiesToObjectStore(entities)),
    ignoreThenConcat(() => storeSyncDate())
  );
[...]

我发现经常看到静态类型的代码确实有助于我的理解。所以在 TypeScript 中也是一样的:

function ignoreThenConcat<T,R>(genObs: () => ObservableInput<R>): OperatorFunction<T,R> {
  return pipe(
    ignoreElements(),
    concatWith(from(genObs()))
  );
}

[...]
  return getEntitiesFromBackend().pipe(
    switchMap(entities => putEntitiesToObjectStore(entities)),
    ignoreThenConcat(() => storeSyncDate())
  );
[...]