获取链式 Observable 发出的最后两个值
Getting last two values emitted from a chained Observable
我有两个 Observable,我正试图将它们链接在一起。我需要订阅两者并根据两者发出的值采取行动。我觉得我在我的设置中做错了什么,想问问这是否是完成此操作的最佳方法。
基本上我有一个 Observable 从流中发出值。在第二个值中,我想要从第一个 Observable 发出的最后两个值。这是因为如果第一个 Observable 抛出错误,我需要知道在抛出错误之前响应是什么。
此外,我希望它们按顺序执行,所以如果 first
observable 发出:
[0, 1, 2, <error>]
我的控制台应该是:
0
1
2
Notification[2] (where the first element is 2 and the second is the error notification
这是我的:
const first = interval(1000).pipe(take(5));
source.subscribe({
next(response) {
console.log("First: ", response);
// Process the response
// .......
}
});
// Create a second Observable from the first that only emits the last two messages
// from the first Observable
const second = first.pipe(materialize(), bufferCount(2), last());
second.subscribe({
next(response) {
console.log("Second: ", response);
}
});
这是订阅两个链式 Observable 的最简单方法吗?我假设我可以在某个地方使用 flatMap
或 concat
,但是我有两个不同类型的 Observable,所以我不确定它是如何工作的。我想我最困扰的是我在这里有两个相关的可观察量订阅。
import { of } from 'rxjs';
import { takeLast } from 'rxjs/operators';
const firstSource$ = of(1, 2, 3, 4, 5, 'error-message');
const secondSource$ = firstSource$.pipe(
takeLast(2) // take the last 2 values
);
secondSource$.subscribe((result) => console.log(result));
// output:
// 5
// 'error-message'
一个RxJS#Observable
有3种排放。
- next :一个 Observable 可以有任意数量的
next
发射。这意味着 0 排放的可观察对象是完全没问题的。发射本身带有一个值。如果您使用的是 TypeScript,则此值的类型以 Observable 的类型编码。发出数字的流:
const stream: Observable<number> = of(42);
complete :一个 Observable 只能发送 1 complete
发射。在这次发射之后,observable 完成并且永远不会再发射任何其他东西。发射本身没有价值。
error :这与 complete 完全相同,只是它有一个值。通常,该值是错误消息或对象(因此是此发射的名称)。如果您使用的是 TypeScript,则此值的类型不会编码为 Observable 的类型(类似于函数 throw
的错误)。请记住,(如完成) 在这次发射之后,可观察对象已完成并且永远不会再发射任何其他东西。
记住错误前的最后一个值
有很多方法可以解决这个问题。考虑一下:
// Emit next once per sceond
const a$ = interval(1000);
// Never emits a next, emits an error after 5.5seconds
const b$ = timer(5500).pipe(
tap(_ => {throw "This is an error"})
);
merge(a$, b$).subscribe({
next: value => console.log("next: ", value),
complete: () => console.log("terminal emission (complete)"),
error: e => console.log("terminal emission (error): ", e)
});
输出:
next: 0
next: 1
next: 2
next: 3
next: 4
terminal emission (error): This is an error
错误是一个终端发射,就像完成一样,但大多数操作员的行为方式是他们自己出错,错误发射到达。所以 takeLast(2)
等待 complete
发射,如果它收到 error
发射就会出错。
您可以使用 catchError
将错误转换为另一个流。如果您选择 completes
而不是 errors
的流,您可以恢复到更正常的行为。
解决方案 我创建了一个自定义运算符来执行您所描述的(我认为)。除非抛出错误,否则它会保持源可观察性不变。它重新抛出相同的错误,但添加了 previousValue
。这里正在使用:
// Custom operator that will embelish any error thrown with the last
// cached value emitted. lastValue = null if error is thrown before
// any values are emitted.
function embelishErrors<T>(): MonoTypeOperatorFunction<T> {
return s => defer(() => {
let cache: T | null = null;
return s.pipe(
tap(v => cache = v),
catchError(e => throwError(() => ({
lastValue: cache,
error: e
})))
);
})
}
// Emit next once per sceond
const a$ = interval(1000);
// Never emits a next, emits an error after 5.5seconds
const b$ = timer(5500).pipe(
tap(_ => {throw "This is an error"})
);
merge(a$, b$).pipe(
embelishErrors()
).subscribe({
next: value => console.log("next: ", value),
complete: () => console.log("terminal emission (complete)"),
error: e => console.log("terminal emission (error): ", e)
});
输出:
next: 0
next: 1
next: 2
next: 3
next: 4
terminal emission (error): { lastValue: 4, error: 'This is an error' }
我有两个 Observable,我正试图将它们链接在一起。我需要订阅两者并根据两者发出的值采取行动。我觉得我在我的设置中做错了什么,想问问这是否是完成此操作的最佳方法。
基本上我有一个 Observable 从流中发出值。在第二个值中,我想要从第一个 Observable 发出的最后两个值。这是因为如果第一个 Observable 抛出错误,我需要知道在抛出错误之前响应是什么。
此外,我希望它们按顺序执行,所以如果 first
observable 发出:
[0, 1, 2, <error>]
我的控制台应该是:
0
1
2
Notification[2] (where the first element is 2 and the second is the error notification
这是我的:
const first = interval(1000).pipe(take(5));
source.subscribe({
next(response) {
console.log("First: ", response);
// Process the response
// .......
}
});
// Create a second Observable from the first that only emits the last two messages
// from the first Observable
const second = first.pipe(materialize(), bufferCount(2), last());
second.subscribe({
next(response) {
console.log("Second: ", response);
}
});
这是订阅两个链式 Observable 的最简单方法吗?我假设我可以在某个地方使用 flatMap
或 concat
,但是我有两个不同类型的 Observable,所以我不确定它是如何工作的。我想我最困扰的是我在这里有两个相关的可观察量订阅。
import { of } from 'rxjs';
import { takeLast } from 'rxjs/operators';
const firstSource$ = of(1, 2, 3, 4, 5, 'error-message');
const secondSource$ = firstSource$.pipe(
takeLast(2) // take the last 2 values
);
secondSource$.subscribe((result) => console.log(result));
// output:
// 5
// 'error-message'
一个RxJS#Observable
有3种排放。
- next :一个 Observable 可以有任意数量的
next
发射。这意味着 0 排放的可观察对象是完全没问题的。发射本身带有一个值。如果您使用的是 TypeScript,则此值的类型以 Observable 的类型编码。发出数字的流:
const stream: Observable<number> = of(42);
complete :一个 Observable 只能发送 1
complete
发射。在这次发射之后,observable 完成并且永远不会再发射任何其他东西。发射本身没有价值。error :这与 complete 完全相同,只是它有一个值。通常,该值是错误消息或对象(因此是此发射的名称)。如果您使用的是 TypeScript,则此值的类型不会编码为 Observable 的类型(类似于函数
throw
的错误)。请记住,(如完成) 在这次发射之后,可观察对象已完成并且永远不会再发射任何其他东西。
记住错误前的最后一个值
有很多方法可以解决这个问题。考虑一下:
// Emit next once per sceond
const a$ = interval(1000);
// Never emits a next, emits an error after 5.5seconds
const b$ = timer(5500).pipe(
tap(_ => {throw "This is an error"})
);
merge(a$, b$).subscribe({
next: value => console.log("next: ", value),
complete: () => console.log("terminal emission (complete)"),
error: e => console.log("terminal emission (error): ", e)
});
输出:
next: 0
next: 1
next: 2
next: 3
next: 4
terminal emission (error): This is an error
错误是一个终端发射,就像完成一样,但大多数操作员的行为方式是他们自己出错,错误发射到达。所以 takeLast(2)
等待 complete
发射,如果它收到 error
发射就会出错。
您可以使用 catchError
将错误转换为另一个流。如果您选择 completes
而不是 errors
的流,您可以恢复到更正常的行为。
解决方案 我创建了一个自定义运算符来执行您所描述的(我认为)。除非抛出错误,否则它会保持源可观察性不变。它重新抛出相同的错误,但添加了 previousValue
。这里正在使用:
// Custom operator that will embelish any error thrown with the last
// cached value emitted. lastValue = null if error is thrown before
// any values are emitted.
function embelishErrors<T>(): MonoTypeOperatorFunction<T> {
return s => defer(() => {
let cache: T | null = null;
return s.pipe(
tap(v => cache = v),
catchError(e => throwError(() => ({
lastValue: cache,
error: e
})))
);
})
}
// Emit next once per sceond
const a$ = interval(1000);
// Never emits a next, emits an error after 5.5seconds
const b$ = timer(5500).pipe(
tap(_ => {throw "This is an error"})
);
merge(a$, b$).pipe(
embelishErrors()
).subscribe({
next: value => console.log("next: ", value),
complete: () => console.log("terminal emission (complete)"),
error: e => console.log("terminal emission (error): ", e)
});
输出:
next: 0
next: 1
next: 2
next: 3
next: 4
terminal emission (error): { lastValue: 4, error: 'This is an error' }