当 observable 完成时,我应该如何发出单个值?

How should I emit a single value when observable completes?

我想在原始 Observable 完成时 发出 一个值 假设如下所示,使用虚数运算符 mapComplete:

let arr = ['a','b', 'c'];

from(arr)
.pipe(mapComplete(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log)
//further processed: myValue

我尝试了以下方法但似乎不合适:

1.

from(arr)
.pipe(toArray())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue

问题:如果原来的 observable 是一个巨大的流,我不想将它缓冲到一个数组中,只是为了发出一个值。

2.

from(arr)
.pipe(last())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue

问题: 如果流完成而没有发出任何东西,我会得到一个错误:[Error [EmptyError]: no elements in sequence]

执行上述操作的正确方法(在 rxjs 术语中)是什么?

您可以通过构建自己的自定义运算符来实现您想要的。

代码可能如下所示

const emitWhenComplete = <T>(val: T) => <U>(source: Observable<U>) =>
  new Observable<T>((observer) => {
    return source.subscribe({
      error: (err) => observer.error(err),
      complete: () => {
        observer.next(val);
        observer.complete();
      },
    });
  });

基本上这个运算符会接受源可观察对象,忽略它发出的所有值,并且仅在源完成时发出。

您可以查看 this stackblitz 进行一些测试。

您可以通过 ignoreElements to not emit anything and endWith 在完成时发出一个值来实现此目的。

from(arr).pipe(
  ignoreElements(),
  endWith('myValue'),
  map(v => `further processed: ${v}`)
).subscribe(console.log);

如果你想在 map 中执行一个函数,你可以预先使用 count() 在完成时发出一个值(发出的值的数量)。

from(arr).pipe(
  count(), // could also use "reduce(() => null, 0)" or "last(null, 0)" or "takeLast(1), defaultIfEmpty(0)" 
  map(() => getMyValue()),
  map(v => `further processed: ${v}`)
).subscribe(console.log);

您还可以使用具有默认值的 last() 运算符。当流为空时,它将消除 no elements in sequence 错误。

from(arr).pipe(
  last(null, 'myValue'),  // `null` denotes no predicate
  map(_ => 'myValue'),    // map the last value from the stream
  map((v)=>`further processed: ${v}`)
).subscribe(console.log);