RxJS 订阅只能工作一次

RxJS Subscription only works one time

在我的 Angular 应用程序中,我正在手动验证来自 JSON 的一些表单数据。

因此,我正在订阅一个接收更改事件的主题:

private _contentChanged = new Subject<IEditorContentChangedEvent>();
contentChanged$: Observable<IEditorContentChangedEvent>;

constructor() {
  this.contentChanged$ = this._contentChanged.asObservable();
}

onContentChanged(event: IEditorContentChangedEvent): void {
  this._contentChanged.next(event);
}

ngOnInit(): void {
  this.contentChanged$
      .pipe(
        untilDestroyed(this),
        debounceTime(1000),
        filter(event => Boolean(event)),
        map(() => {
          this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
          console.log(this.resource);
          this.parentForm.setValue(this.resource);

          return of(event);
        }),
        catchError((error: Error) => {
          ...

          return EMPTY;
        }),
      )
      .subscribe(() => {
        ...
      });
}

我在 map() 中看到日志语句,这是第一次 contentChanged$ 收到一个值,但如果 map 中的代码抛出错误,则第二次看不到。

我假设这是因为 catchError() returns EMPTY。我一直在关注 this article on Medium 等指南,但到目前为止运气不佳。

我尝试在 catchError() 处理程序中返回:of(error),但这也不起作用。

如果我只是订阅 contentChanged$ 而没有任何运算符,我每次都会看到 _contentChanged 收到一个新值,所以我的数据源按预期工作。

如果我进行的更改不会导致 catchError 被调用,我会在 map 中看到日志语句,因此必须是 catchError 关闭流。

如何让 Observable/Subscription 保持活动状态以处理新的 contentChanged$ 值?

由于您的评论 setValue 抛出错误,该错误在 CathError 中被捕获并且根据可观察到的行为 source stream 将在任何之后消失源流链(管道)中的错误类型或源流完成

In-order 要保持流活动,您需要在内部处理此错误 -

  • 如果任何可观察到的子项导致错误,则

switchMap/ConcatMap...(() => child$.pipe(catchError(...))

  • 在普通 javascript 错误的情况下用 try catch block
  • 包装它

你的情况是 JSON.parse 错误,这是一个 javascript 错误,所以用 try catch

包裹起来
  map(() => {
    this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
    console.log(this.resource);
    this.parentForm.setValue(this.resource);

    return of(event);
  }),

用下面的代码修改上面的代码片段

  map(() => {
    try {
      this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
    } catch(err){
      console.error(err);
    }
    console.log(this.resource);
    this.parentForm.setValue(this.resource);

    return event;
  }),

可观察合约

这里是概述:link

相关部分:

Upon issuing an complete or error notification, an observable may not thereafter issue any further notifications.

这意味着一旦 observable 完成或出错,它就完成了。他们是终端emissions/notifications.


一个解决方案:

不要让您的可观察对象发出错误。如果您知道地图中的某些同步代码正在抛出错误,您可以捕获它并在那里处理它,然后它永远不会传播到您的可观察对象中:

try {
  this.resource = JSON.parse(/* ... */);
} catch(err){
  console.error(err);
}

另一个解决方案

一旦源可观察错误,只需re-subscribe到您的源可观察。这是否有效取决于您首先订阅您的来源时产生的副作用。

this.contentChanged$.pipe(
  untilDestroyed(this),
  debounceTime(1000),
  filter(event => Boolean(event)),
  map(() => {
    ...
  }),
  tap({error: (error: Error) => console.error(error) }),
  // By default, if contentChanged$ keeps erroring, this will
  // keep retrying for forever.
  retry()
).subscribe(() => {
  ...
});

您还可以通过从 catchError

返回一个可观察对象来有条件地重试
this.contentChanged$.pipe(
  untilDestroyed(this),
  debounceTime(1000),
  filter(event => Boolean(event)),
  map(() => {
    ...
  }),
  catchError((error: Error, source$) => {
    console.error(error)
    if (/* retry? */){
      return source$;
    } else if (/* Do nothing? */) {
      return EMPTY;
    } else if (/* Emit some string values? */) {
      return of("Hello", "World");
    } 

    // Otherwise rethrow the error!
    return throwError(() => error);
  })
).subscribe(() => {
  ...
});