当 observable 完成时,我应该如何发出单个值?
How should I emit a single value when observable completes?
我想在原始 Observable 完成时 发出 一个值 假设如下所示,使用虚数运算符 mapComplete
:
let arr = ['a','b', 'c'];
from(arr)
.pipe(mapComplete(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log)
//further processed: myValue
我尝试了以下方法但似乎不合适:
1.
from(arr)
.pipe(toArray())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue
问题:如果原来的 observable 是一个巨大的流,我不想将它缓冲到一个数组中,只是为了发出一个值。
2.
from(arr)
.pipe(last())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue
问题: 如果流完成而没有发出任何东西,我会得到一个错误:[Error [EmptyError]: no elements in sequence]
执行上述操作的正确方法(在 rxjs 术语中)是什么?
您可以通过构建自己的自定义运算符来实现您想要的。
代码可能如下所示
const emitWhenComplete = <T>(val: T) => <U>(source: Observable<U>) =>
new Observable<T>((observer) => {
return source.subscribe({
error: (err) => observer.error(err),
complete: () => {
observer.next(val);
observer.complete();
},
});
});
基本上这个运算符会接受源可观察对象,忽略它发出的所有值,并且仅在源完成时发出。
您可以查看 this stackblitz 进行一些测试。
您可以通过 ignoreElements
to not emit anything and endWith
在完成时发出一个值来实现此目的。
from(arr).pipe(
ignoreElements(),
endWith('myValue'),
map(v => `further processed: ${v}`)
).subscribe(console.log);
如果你想在 map
中执行一个函数,你可以预先使用 count()
在完成时发出一个值(发出的值的数量)。
from(arr).pipe(
count(), // could also use "reduce(() => null, 0)" or "last(null, 0)" or "takeLast(1), defaultIfEmpty(0)"
map(() => getMyValue()),
map(v => `further processed: ${v}`)
).subscribe(console.log);
您还可以使用具有默认值的 last()
运算符。当流为空时,它将消除 no elements in sequence
错误。
from(arr).pipe(
last(null, 'myValue'), // `null` denotes no predicate
map(_ => 'myValue'), // map the last value from the stream
map((v)=>`further processed: ${v}`)
).subscribe(console.log);
我想在原始 Observable 完成时 发出 一个值 假设如下所示,使用虚数运算符 mapComplete
:
let arr = ['a','b', 'c'];
from(arr)
.pipe(mapComplete(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log)
//further processed: myValue
我尝试了以下方法但似乎不合适:
1.
from(arr)
.pipe(toArray())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue
问题:如果原来的 observable 是一个巨大的流,我不想将它缓冲到一个数组中,只是为了发出一个值。
2.
from(arr)
.pipe(last())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue
问题: 如果流完成而没有发出任何东西,我会得到一个错误:[Error [EmptyError]: no elements in sequence]
执行上述操作的正确方法(在 rxjs 术语中)是什么?
您可以通过构建自己的自定义运算符来实现您想要的。
代码可能如下所示
const emitWhenComplete = <T>(val: T) => <U>(source: Observable<U>) =>
new Observable<T>((observer) => {
return source.subscribe({
error: (err) => observer.error(err),
complete: () => {
observer.next(val);
observer.complete();
},
});
});
基本上这个运算符会接受源可观察对象,忽略它发出的所有值,并且仅在源完成时发出。
您可以查看 this stackblitz 进行一些测试。
您可以通过 ignoreElements
to not emit anything and endWith
在完成时发出一个值来实现此目的。
from(arr).pipe(
ignoreElements(),
endWith('myValue'),
map(v => `further processed: ${v}`)
).subscribe(console.log);
如果你想在 map
中执行一个函数,你可以预先使用 count()
在完成时发出一个值(发出的值的数量)。
from(arr).pipe(
count(), // could also use "reduce(() => null, 0)" or "last(null, 0)" or "takeLast(1), defaultIfEmpty(0)"
map(() => getMyValue()),
map(v => `further processed: ${v}`)
).subscribe(console.log);
您还可以使用具有默认值的 last()
运算符。当流为空时,它将消除 no elements in sequence
错误。
from(arr).pipe(
last(null, 'myValue'), // `null` denotes no predicate
map(_ => 'myValue'), // map the last value from the stream
map((v)=>`further processed: ${v}`)
).subscribe(console.log);