如何在 RxJs Observable 构造函数中使用异步调用
How to use an async calling inside of an RxJs Observable constructor
我正在使用 NgRx 在应用程序中设置 effect
。
在效果期间,我需要获取一些存储值,做一些逻辑,然后根据我的逻辑调度一个或多个动作 运行.
所以,为了更容易制作,我正在使用 dispatch: false
的效果。
public getCachedOrServerValue$ = createEffect(() =>
this.actions$.pipe(
ofType(myActions.getCachedOrServerValue),
switchMap((_) => this.getCachedOrServerValue())
), { dispatch: false }
);
所以在上面我会调用getCachedOrServerValue
。我想使用 switchMap
(而不仅仅是 tap
)所以如果我在前一个完成之前得到一个新的 myActions.getCachedOrServerValue
,它将取消之前的订阅(根据我对 switchMap 的理解) .
我们调用的函数如下所示:
private getCachedOrServerValue(): Observable<void> {
const result$ = new Observable<void>(async (subscriber) => { // <---- I need this to be async so I can use await
// First see if we can get it from the store
const cached = await this.storeUtils.getStoreValue(fromApp.getValue);
if (cached) {
this.store$.dispatch(myActions.getCachedOrServerValueSuccess(cached));
subscriber.next();
return;
}
// Need to request from the server
const name = await this.storeUtils.getStoreValue(fromApp.getSiteName);
// storeUtils.getStoreValue is just a helper we have to get a single value as a Promise
const valueId = await this.storeUtils.getStoreValue(fromApp.getValueId);
const serverValue = await this.myService.getValue(namevalueId);
this.store$.dispatch(myActions.storeValue(serverValue));
this.store$.dispatch(myActions.getCachedOrServerValueSuccess(cached));
});
return result$;
}
首先,我封装了一个 observable,据我所知,我们需要一个 observable 才能在 switchMap
.
中使用此调用
其次(我的实际问题),我希望能够在 Observable subscriber
构造函数参数中使用 await
,但是 VSCode 报错。
错误详情:
Argument of type '(this: Observable, subscriber: Subscriber) => Promise' is not assignable to parameter of type '(this: Observable, subscriber: Subscriber) => TeardownLogic'.
Type 'Promise' is not assignable to type 'TeardownLogic'.
Property 'unsubscribe' is missing in type 'Promise' but required in type 'Unsubscribable'.ts(2345)
types.d.ts(31, 5): 'unsubscribe' 在这里声明。
这里有什么问题,如何使用如上所示的 aysnc/await
?
更新
根据@StPaulis 的回答,我可以避免这样做(至少在这种情况下)。
我可以像下面那样做(还没有测试,但肯定没问题):
public getCachedOrServerValue$ = createEffect(() =>
this.actions$.pipe(
ofType(myActions.getCachedOrServerValue),
switchMap(_ => from(this.getCachedOrServerValue()).pipe( // <-- use a from here
map(data => myActions.getCachedOrServerValueSuccess(data)),
catchError(error => of(myActions.getCachedOrServerValueFail(error)))
))));
private async getCachedOrServerValue(): Promise<DataState>{
// First see if we can get it from the store
const cached = await this.storeUtils.getStoreValue(fromApp.getValue);
if (cached) {
return cached;
}
// Need to request from the server
const name = await this.storeUtils.getStoreValue(fromApp.getName);
const valueId = await this.storeUtils.getStoreValue(fromApp.getValueId);
const serverValue = await this.myService.getValue(name, valueId);
return serverValue;
}
在我看来,将 observables 与 promises 混合从来都不是一个好主意。
我建议您使用 RxJs | from 将您的承诺转换为一个可观察对象并在您的管道中使用它,而不是创建一个新的可观察对象并将所有逻辑包装在里面。
RxJs 旨在取代(异步 | 等待)功能。
我正在使用 NgRx 在应用程序中设置 effect
。
在效果期间,我需要获取一些存储值,做一些逻辑,然后根据我的逻辑调度一个或多个动作 运行.
所以,为了更容易制作,我正在使用 dispatch: false
的效果。
public getCachedOrServerValue$ = createEffect(() =>
this.actions$.pipe(
ofType(myActions.getCachedOrServerValue),
switchMap((_) => this.getCachedOrServerValue())
), { dispatch: false }
);
所以在上面我会调用getCachedOrServerValue
。我想使用 switchMap
(而不仅仅是 tap
)所以如果我在前一个完成之前得到一个新的 myActions.getCachedOrServerValue
,它将取消之前的订阅(根据我对 switchMap 的理解) .
我们调用的函数如下所示:
private getCachedOrServerValue(): Observable<void> {
const result$ = new Observable<void>(async (subscriber) => { // <---- I need this to be async so I can use await
// First see if we can get it from the store
const cached = await this.storeUtils.getStoreValue(fromApp.getValue);
if (cached) {
this.store$.dispatch(myActions.getCachedOrServerValueSuccess(cached));
subscriber.next();
return;
}
// Need to request from the server
const name = await this.storeUtils.getStoreValue(fromApp.getSiteName);
// storeUtils.getStoreValue is just a helper we have to get a single value as a Promise
const valueId = await this.storeUtils.getStoreValue(fromApp.getValueId);
const serverValue = await this.myService.getValue(namevalueId);
this.store$.dispatch(myActions.storeValue(serverValue));
this.store$.dispatch(myActions.getCachedOrServerValueSuccess(cached));
});
return result$;
}
首先,我封装了一个 observable,据我所知,我们需要一个 observable 才能在 switchMap
.
其次(我的实际问题),我希望能够在 Observable subscriber
构造函数参数中使用 await
,但是 VSCode 报错。
错误详情:
Argument of type '(this: Observable, subscriber: Subscriber) => Promise' is not assignable to parameter of type '(this: Observable, subscriber: Subscriber) => TeardownLogic'. Type 'Promise' is not assignable to type 'TeardownLogic'. Property 'unsubscribe' is missing in type 'Promise' but required in type 'Unsubscribable'.ts(2345)
types.d.ts(31, 5): 'unsubscribe' 在这里声明。
这里有什么问题,如何使用如上所示的 aysnc/await
?
更新
根据@StPaulis 的回答,我可以避免这样做(至少在这种情况下)。
我可以像下面那样做(还没有测试,但肯定没问题):
public getCachedOrServerValue$ = createEffect(() =>
this.actions$.pipe(
ofType(myActions.getCachedOrServerValue),
switchMap(_ => from(this.getCachedOrServerValue()).pipe( // <-- use a from here
map(data => myActions.getCachedOrServerValueSuccess(data)),
catchError(error => of(myActions.getCachedOrServerValueFail(error)))
))));
private async getCachedOrServerValue(): Promise<DataState>{
// First see if we can get it from the store
const cached = await this.storeUtils.getStoreValue(fromApp.getValue);
if (cached) {
return cached;
}
// Need to request from the server
const name = await this.storeUtils.getStoreValue(fromApp.getName);
const valueId = await this.storeUtils.getStoreValue(fromApp.getValueId);
const serverValue = await this.myService.getValue(name, valueId);
return serverValue;
}
在我看来,将 observables 与 promises 混合从来都不是一个好主意。
我建议您使用 RxJs | from 将您的承诺转换为一个可观察对象并在您的管道中使用它,而不是创建一个新的可观察对象并将所有逻辑包装在里面。
RxJs 旨在取代(异步 | 等待)功能。