获取链式 Observable 发出的最后两个值

Getting last two values emitted from a chained Observable

我有两个 Observable,我正试图将它们链接在一起。我需要订阅两者并根据两者发出的值采取行动。我觉得我在我的设置中做错了什么,想问问这是否是完成此操作的最佳方法。

基本上我有一个 Observable 从流中发出值。在第二个值中,我想要从第一个 Observable 发出的最后两个值。这是因为如果第一个 Observable 抛出错误,我需要知道在抛出错误之前响应是什么。

此外,我希望它们按顺序执行,所以如果 first observable 发出:

[0, 1, 2, <error>]

我的控制台应该是:

0
1
2
Notification[2] (where the first element is 2 and the second is the error notification

这是我的:

const first = interval(1000).pipe(take(5));

source.subscribe({
  next(response) {
    console.log("First: ", response);

    // Process the response
    // .......
  }
});


// Create a second Observable from the first that only emits the last two messages
// from the first Observable
const second = first.pipe(materialize(), bufferCount(2), last());

second.subscribe({
  next(response) {
    console.log("Second: ", response);
  }
});

这是订阅两个链式 Observable 的最简单方法吗?我假设我可以在某个地方使用 flatMapconcat,但是我有两个不同类型的 Observable,所以我不确定它是如何工作的。我想我最困扰的是我在这里有两个相关的可观察量订阅。

import { of } from 'rxjs';
import { takeLast } from 'rxjs/operators';

const firstSource$ = of(1, 2, 3, 4, 5, 'error-message');
const secondSource$ = firstSource$.pipe(
 takeLast(2) // take the last 2 values 
);

secondSource$.subscribe((result) => console.log(result));
// output: 
// 5
// 'error-message'

一个RxJS#Observable有3种排放。

  1. next :一个 Observable 可以有任意数量的 next 发射。这意味着 0 排放的可观察对象是完全没问题的。发射本身带有一个值。如果您使用的是 TypeScript,则此值的类型以 Observable 的类型编码。发出数字的流:
    const stream: Observable<number> = of(42);
  1. complete :一个 Observable 只能发送 1 complete 发射。在这次发射之后,observable 完成并且永远不会再发射任何其他东西。发射本身没有价值。

  2. error :这与 complete 完全相同,只是它有一个值。通常,该值是错误消息或对象(因此是此发射的名称)。如果您使用的是 TypeScript,则此值的类型不会编码为 Observable 的类型(类似于函数 throw 的错误)。请记住,(如完成) 在这次发射之后,可观察对象已完成并且永远不会再发射任何其他东西。


记住错误前的最后一个值

有很多方法可以解决这个问题。考虑一下:

// Emit next once per sceond
const a$ = interval(1000);
// Never emits a next, emits an error after 5.5seconds
const b$ = timer(5500).pipe(
  tap(_ => {throw "This is an error"})
);

merge(a$, b$).subscribe({
  next: value => console.log("next: ", value),
  complete: () => console.log("terminal emission (complete)"),
  error: e => console.log("terminal emission (error): ", e)
});

输出:

next:  0
next:  1
next:  2
next:  3
next:  4
terminal emission (error):  This is an error

错误是一个终端发射,就像完成一样,但大多数操作员的行为方式是他们自己出错,错误发射到达。所以 takeLast(2) 等待 complete 发射,如果它收到 error 发射就会出错。

您可以使用 catchError 将错误转换为另一个流。如果您选择 completes 而不是 errors 的流,您可以恢复到更正常的行为。

解决方案 我创建了一个自定义运算符来执行您所描述的(我认为)。除非抛出错误,否则它会保持源可观察性不变。它重新抛出相同的错误,但添加了 previousValue。这里正在使用:

// Custom operator that will embelish any error thrown with the last
// cached value emitted. lastValue = null if error is thrown before
// any values are emitted.
function embelishErrors<T>(): MonoTypeOperatorFunction<T> {
  return s => defer(() => {
    let cache: T | null = null;
    return s.pipe(
      tap(v => cache = v),
      catchError(e => throwError(() => ({
        lastValue: cache, 
        error: e
      })))
    );
  })
}

// Emit next once per sceond
const a$ = interval(1000);
// Never emits a next, emits an error after 5.5seconds
const b$ = timer(5500).pipe(
  tap(_ => {throw "This is an error"})
);

merge(a$, b$).pipe(
  embelishErrors()
).subscribe({
  next: value => console.log("next: ", value),
  complete: () => console.log("terminal emission (complete)"),
  error: e => console.log("terminal emission (error): ", e)
});

输出:

next:  0
next:  1
next:  2
next:  3
next:  4
terminal emission (error):  { lastValue: 4, error: 'This is an error' }