如何使用我的第一个自定义运算符正确扩展 rxjs/Observable
How do I properly extend rxjs/Observable with my first custom operator
我正在尝试在我的服务调用中使用重试模式(实际上:ngrx/store 中的@Effects)并增加延迟间隔。因为我设法为一个调用想出了一个工作代码(即使它看起来没有优化我不想在我的问题中关注这个),我现在想将它提取到一个自定义的 Observable 运算符中,并重复使用它在我所有的服务电话中。
如何为新运算符设计 API/usage 以及如何使其被 TypeScript 识别,我一头雾水。
下面的代码肯定行不通,因为它可能积累了很多问题。
所以,现在我有一个 call/Effect 如下:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
// ...and this part would have to be extracted:
.retryWhen(attempts => Observable
.zip(attempts, Observable.range(1, 5))
.flatMap((n, i) => {
if (i < 4) {
return Observable.timer(1000 * i);
} else {
throw(n);
}
})
)
})
.catch(err => Observable.of(new LoadDataFailed()));
我所追求的是能够在其他效果中重用重试部分,并具有类似于以下的模式:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
.retryWhen(attempts => Observable.retryOrThrow(attempts, maxAttempts)
// or maybe - that's my design question
.retryOrThrow(attempts, maxAttempts)
})
.catch(err => Observable.of(new LoadDataFailed()));
为简单起见,我们可以假设延迟回调模式 (i * 1000
) 对于整个应用程序都是不变的。
下面的代码是我的尝试,但是显然不行。
declare module 'rxjs/Observable' {
interface Observable<T> {
retryOrThrow<T>(attempts: any, max: number): Observable<T>;
}
}
Observable.prototype.retryOrThrow = function(attempt, max) {
console.log('retryOrThrow called');
return Observable.create(subscriber => {
const source = this;
const subscription = source.subscribe(() => {
// important: catch errors from user-provided callbacks
try {
subscriber
.zip(attempt, Observable.range(1, max + 1))
.flatMap((n, i) => {
console.log(n, i);
if (i < max) {
return Observable.timer(1000 * i);
} else {
throw(n);
}
});
} catch (err) {
subscriber.error(err);
}
},
// be sure to handle errors and completions as appropriate and send them along
err => subscriber.error(err),
() => subscriber.complete());
// to return now
return subscription;
});
};
- 我不确定如何为 new 运算符设计 API,哪种语法最适合这里。
- 我不知道如何正确声明新运算符和 Observable 命名空间或模块,以便 TypeScript 识别新内容。
更新的服务电话:
getMocky(){
const u = Math.random();
const okUrl = 'http://www.mocky.io/v2/58ffadf71100009b17f60044';
const erUrl = 'http://www.mocky.io/v2/58ffae7f110000ba17f60046';
return u > 0.6 ? this.http.get(okUrl) : this.http.get(erUrl);
}
我无法直接回答你的问题,因为我不知道如何使用自定义运算符扩展 rxjs。
幸运的是,您(和我)不需要知道。
您真正要问的是如何预定义一个运算符链以在多个地方重用。
你只需要let-Operator,而且使用起来非常简单。
首先将要重用的逻辑提取到 returns 可观察的函数中:
function retryOrThrow(maxAttempts, timeout) {
return (source) =>
source
.retryWhen(attempts => Observable
.zip(attempts, Observable.range(1, maxAttempts + 1))
.flatMap((n, i) => {
if (i < maxAttempts) {
return Observable.timer(timeout * i);
} else {
throw (n);
}
})
);
}
使用效果中的函数使用 let
:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
// inject the pre-defined logic with the desired parameters
.let(retryOrThrow(5, 1000))
})
.catch(err => Observable.of(new LoadDataFailed()));
了解 let
功能的最简单方法是查看它的 source-code。这真的很简单,因为它所做的就是将给定的函数应用于源可观察对象。
我正在尝试在我的服务调用中使用重试模式(实际上:ngrx/store 中的@Effects)并增加延迟间隔。因为我设法为一个调用想出了一个工作代码(即使它看起来没有优化我不想在我的问题中关注这个),我现在想将它提取到一个自定义的 Observable 运算符中,并重复使用它在我所有的服务电话中。
如何为新运算符设计 API/usage 以及如何使其被 TypeScript 识别,我一头雾水。
下面的代码肯定行不通,因为它可能积累了很多问题。
所以,现在我有一个 call/Effect 如下:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
// ...and this part would have to be extracted:
.retryWhen(attempts => Observable
.zip(attempts, Observable.range(1, 5))
.flatMap((n, i) => {
if (i < 4) {
return Observable.timer(1000 * i);
} else {
throw(n);
}
})
)
})
.catch(err => Observable.of(new LoadDataFailed()));
我所追求的是能够在其他效果中重用重试部分,并具有类似于以下的模式:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
.retryWhen(attempts => Observable.retryOrThrow(attempts, maxAttempts)
// or maybe - that's my design question
.retryOrThrow(attempts, maxAttempts)
})
.catch(err => Observable.of(new LoadDataFailed()));
为简单起见,我们可以假设延迟回调模式 (i * 1000
) 对于整个应用程序都是不变的。
下面的代码是我的尝试,但是显然不行。
declare module 'rxjs/Observable' {
interface Observable<T> {
retryOrThrow<T>(attempts: any, max: number): Observable<T>;
}
}
Observable.prototype.retryOrThrow = function(attempt, max) {
console.log('retryOrThrow called');
return Observable.create(subscriber => {
const source = this;
const subscription = source.subscribe(() => {
// important: catch errors from user-provided callbacks
try {
subscriber
.zip(attempt, Observable.range(1, max + 1))
.flatMap((n, i) => {
console.log(n, i);
if (i < max) {
return Observable.timer(1000 * i);
} else {
throw(n);
}
});
} catch (err) {
subscriber.error(err);
}
},
// be sure to handle errors and completions as appropriate and send them along
err => subscriber.error(err),
() => subscriber.complete());
// to return now
return subscription;
});
};
- 我不确定如何为 new 运算符设计 API,哪种语法最适合这里。
- 我不知道如何正确声明新运算符和 Observable 命名空间或模块,以便 TypeScript 识别新内容。
更新的服务电话:
getMocky(){
const u = Math.random();
const okUrl = 'http://www.mocky.io/v2/58ffadf71100009b17f60044';
const erUrl = 'http://www.mocky.io/v2/58ffae7f110000ba17f60046';
return u > 0.6 ? this.http.get(okUrl) : this.http.get(erUrl);
}
我无法直接回答你的问题,因为我不知道如何使用自定义运算符扩展 rxjs。
幸运的是,您(和我)不需要知道。 您真正要问的是如何预定义一个运算符链以在多个地方重用。
你只需要let-Operator,而且使用起来非常简单。
首先将要重用的逻辑提取到 returns 可观察的函数中:
function retryOrThrow(maxAttempts, timeout) {
return (source) =>
source
.retryWhen(attempts => Observable
.zip(attempts, Observable.range(1, maxAttempts + 1))
.flatMap((n, i) => {
if (i < maxAttempts) {
return Observable.timer(timeout * i);
} else {
throw (n);
}
})
);
}
使用效果中的函数使用 let
:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
// inject the pre-defined logic with the desired parameters
.let(retryOrThrow(5, 1000))
})
.catch(err => Observable.of(new LoadDataFailed()));
了解 let
功能的最简单方法是查看它的 source-code。这真的很简单,因为它所做的就是将给定的函数应用于源可观察对象。