RxJS 订阅只能工作一次
RxJS Subscription only works one time
在我的 Angular 应用程序中,我正在手动验证来自 JSON 的一些表单数据。
因此,我正在订阅一个接收更改事件的主题:
private _contentChanged = new Subject<IEditorContentChangedEvent>();
contentChanged$: Observable<IEditorContentChangedEvent>;
constructor() {
this.contentChanged$ = this._contentChanged.asObservable();
}
onContentChanged(event: IEditorContentChangedEvent): void {
this._contentChanged.next(event);
}
ngOnInit(): void {
this.contentChanged$
.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
catchError((error: Error) => {
...
return EMPTY;
}),
)
.subscribe(() => {
...
});
}
我在 map()
中看到日志语句,这是第一次 contentChanged$ 收到一个值,但如果 map
中的代码抛出错误,则第二次看不到。
我假设这是因为 catchError()
returns EMPTY
。我一直在关注 this article on Medium 等指南,但到目前为止运气不佳。
我尝试在 catchError()
处理程序中返回:of(error)
,但这也不起作用。
如果我只是订阅 contentChanged$
而没有任何运算符,我每次都会看到 _contentChanged
收到一个新值,所以我的数据源按预期工作。
如果我进行的更改不会导致 catchError
被调用,我会在 map
中看到日志语句,因此必须是 catchError
关闭流。
如何让 Observable/Subscription 保持活动状态以处理新的 contentChanged$
值?
由于您的评论 setValue
抛出错误,该错误在 CathError
中被捕获并且根据可观察到的行为 source stream 将在任何之后消失源流链(管道)中的错误类型或源流完成
In-order 要保持流活动,您需要在内部处理此错误 -
- 如果任何可观察到的子项导致错误,则
switchMap/ConcatMap...(() => child$.pipe(catchError(...))
- 在普通 javascript 错误的情况下用
try catch block
包装它
你的情况是 JSON.parse
错误,这是一个 javascript 错误,所以用 try catch
包裹起来
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
用下面的代码修改上面的代码片段
map(() => {
try {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
} catch(err){
console.error(err);
}
console.log(this.resource);
this.parentForm.setValue(this.resource);
return event;
}),
可观察合约
这里是概述:link
相关部分:
Upon issuing an complete or error notification, an observable may not thereafter issue any further notifications.
这意味着一旦 observable 完成或出错,它就完成了。他们是终端emissions/notifications.
一个解决方案:
不要让您的可观察对象发出错误。如果您知道地图中的某些同步代码正在抛出错误,您可以捕获它并在那里处理它,然后它永远不会传播到您的可观察对象中:
try {
this.resource = JSON.parse(/* ... */);
} catch(err){
console.error(err);
}
另一个解决方案
一旦源可观察错误,只需re-subscribe到您的源可观察。这是否有效取决于您首先订阅您的来源时产生的副作用。
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
tap({error: (error: Error) => console.error(error) }),
// By default, if contentChanged$ keeps erroring, this will
// keep retrying for forever.
retry()
).subscribe(() => {
...
});
您还可以通过从 catchError
返回一个可观察对象来有条件地重试
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
catchError((error: Error, source$) => {
console.error(error)
if (/* retry? */){
return source$;
} else if (/* Do nothing? */) {
return EMPTY;
} else if (/* Emit some string values? */) {
return of("Hello", "World");
}
// Otherwise rethrow the error!
return throwError(() => error);
})
).subscribe(() => {
...
});
在我的 Angular 应用程序中,我正在手动验证来自 JSON 的一些表单数据。
因此,我正在订阅一个接收更改事件的主题:
private _contentChanged = new Subject<IEditorContentChangedEvent>();
contentChanged$: Observable<IEditorContentChangedEvent>;
constructor() {
this.contentChanged$ = this._contentChanged.asObservable();
}
onContentChanged(event: IEditorContentChangedEvent): void {
this._contentChanged.next(event);
}
ngOnInit(): void {
this.contentChanged$
.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
catchError((error: Error) => {
...
return EMPTY;
}),
)
.subscribe(() => {
...
});
}
我在 map()
中看到日志语句,这是第一次 contentChanged$ 收到一个值,但如果 map
中的代码抛出错误,则第二次看不到。
我假设这是因为 catchError()
returns EMPTY
。我一直在关注 this article on Medium 等指南,但到目前为止运气不佳。
我尝试在 catchError()
处理程序中返回:of(error)
,但这也不起作用。
如果我只是订阅 contentChanged$
而没有任何运算符,我每次都会看到 _contentChanged
收到一个新值,所以我的数据源按预期工作。
如果我进行的更改不会导致 catchError
被调用,我会在 map
中看到日志语句,因此必须是 catchError
关闭流。
如何让 Observable/Subscription 保持活动状态以处理新的 contentChanged$
值?
由于您的评论 setValue
抛出错误,该错误在 CathError
中被捕获并且根据可观察到的行为 source stream 将在任何之后消失源流链(管道)中的错误类型或源流完成
In-order 要保持流活动,您需要在内部处理此错误 -
- 如果任何可观察到的子项导致错误,则
switchMap/ConcatMap...(() => child$.pipe(catchError(...))
- 在普通 javascript 错误的情况下用
try catch block
包装它
你的情况是 JSON.parse
错误,这是一个 javascript 错误,所以用 try catch
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
用下面的代码修改上面的代码片段
map(() => {
try {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
} catch(err){
console.error(err);
}
console.log(this.resource);
this.parentForm.setValue(this.resource);
return event;
}),
可观察合约
这里是概述:link
相关部分:
Upon issuing an complete or error notification, an observable may not thereafter issue any further notifications.
这意味着一旦 observable 完成或出错,它就完成了。他们是终端emissions/notifications.
一个解决方案:
不要让您的可观察对象发出错误。如果您知道地图中的某些同步代码正在抛出错误,您可以捕获它并在那里处理它,然后它永远不会传播到您的可观察对象中:
try {
this.resource = JSON.parse(/* ... */);
} catch(err){
console.error(err);
}
另一个解决方案
一旦源可观察错误,只需re-subscribe到您的源可观察。这是否有效取决于您首先订阅您的来源时产生的副作用。
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
tap({error: (error: Error) => console.error(error) }),
// By default, if contentChanged$ keeps erroring, this will
// keep retrying for forever.
retry()
).subscribe(() => {
...
});
您还可以通过从 catchError
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
catchError((error: Error, source$) => {
console.error(error)
if (/* retry? */){
return source$;
} else if (/* Do nothing? */) {
return EMPTY;
} else if (/* Emit some string values? */) {
return of("Hello", "World");
}
// Otherwise rethrow the error!
return throwError(() => error);
})
).subscribe(() => {
...
});