Rxjs Observer 过滤器无法处理错误

Rxjs Observer filter not working for error

我的服务中有以下代码

        let b = new BehaviorSubject({ a: undefined })
        let o = b.asObservable();
        o.pipe(filter(_ => _.a === 5)).subscribe(() => {
            debugger;
        }, error => {
            debugger
        })
        b.next({ a: 10 })
        b.next({ a: 5 })
        b.error({ a: 10 })

当我调用 b.next({a:10}) 时,它不会在 onNext 回调中触发调试器 当我调用 b.next({a:5}) 时,它会触发调试器onNext 回调。 当我调用 b.error({a:10}) 它会在 onError 回调中触发调试器。

我的预期是不应该调用 onError 回调,因为不满足过滤条件。但是,显然我这里有问题。

如何过滤错误?

提前致谢。

您无法过滤 error 的原因与您无法过滤 complete 的原因相同。这没有意义。它们表示流的结束。不能过滤结尾。

当然,您可以捕获错误然后什么都不做 - 这感觉有点像过滤错误。

o.pipe(
  catchError(err => 
    err?.a === 5 ?
    return of(err) :
    EMPTY
  ),
  filter(val => val.a === 5)
).subscribe({
  next: val => debugger,
  error: err => debugger,
  complete: () => debugger
});

当然,您在错误或完成后发送给主题的任何内容都不会执行任何操作。

    b.next({ a: 10 }); // Filtered, not emitted
    b.next({ a: 5 }); // value emitted
    b.error({ a: 5 }); // caught and new non-error { a: 5 } emitted as value. subject 'b' is done
    b.next({ a: 5 }); // Does nothing

最后一次调用不会执行任何操作,因为主题有 errored/completed。

同理:

    b.next({ a: 10 }); // Filtered, not emitted
    b.next({ a: 5 }); // value emitted
    b.complete(); // subject 'b' is done
    b.next({ a: 5 }); // Does nothing

最后:

    b.next({ a: 10 }); // Filtered, not emitted
    b.next({ a: 5 }); // value emitted
    b.error({ a: 10 }); // caught and not emitted. subject 'b' is done
    b.next({ a: 5 }); // Does nothing