Angular - http 拦截器 - http 速率限制器 - 滑动 window
Angular - http interceptors - http rate limiter - sliding window
我有一个需要限制传出 http 请求数量的用例。是的,我在服务器端确实有速率限制器,但前端也需要限制活动 http 请求的数量 too.For 这就是我尝试实现滑动 window 协议的原因在任何时候我都只会有 n 个活动请求。
这种使用 Rxjs 的方法通常效果很好,请参见此处:
https://jsbin.com/pacicubeci/1/edit?js,console,output
但我不清楚如何使用与 http 拦截器相同的逻辑。我的以下尝试在编译时失败并出现以下错误:
类型 'Subscription' 缺少类型 'Observable<HttpEvent>' 的以下属性:_isScalar、source、operator、lift 和 114 个以上。(2740)
这样,我如何才能 return 一个可观察对象并同时在 http 拦截器上维护一个队列?我的方法有缺陷吗?我可以使用 http 拦截器来限制 http 速率吗?
@Injectable()
export class I1 implements HttpInterceptor {
intercept(
req: HttpRequest<any>,
next: HttpHandler
): Observable<HttpEvent<any>> {
const modified = req.clone({ setHeaders: { "Custom-Header-1": "1" } });
return next
.handle(req)
.do((ev: HttpEvent<any>) => {
if (ev instanceof HttpResponse) {
console.log(ev);
}
})
.pipe(
bufferTime(1000, null, 1),
filter(buffer => buffer.length > 0),
concatMap(buffer => of(buffer).pipe(delay(1000)))
)
.subscribe(console.log);
}
}
https://stackblitz.com/edit/angular-interceptors-npqkjp?file=app/interceptors.ts
在您的拦截器上,您返回的是订阅,而不是 Observable。
如果删除 .subscribe(console.log)
行,它应该可以正常编译。订阅由消费者完成。
如果你想 console.log 发射的所有东西,使用 tap(next => ...)
运算符
编辑 - 嗯,它解决了编译错误,但我不确定它是否会如您所愿...我不完全了解拦截器的工作原理。
如果您想了解更多关于拦截器和 HttpClientModule 的工作原理,您可以查看这篇文章:Exploring the HttpClientModule in Angular.
Is my approach flawed?
In this case, the problem is that next.handle
is expected to return an Observable, but by subscribing to it, it returns a Subscription.
为了更好地理解原因,我将粘贴从上面链接的文章中复制的片段:
const obsBE$ = new Observable(obs => {
timer(1000)
.subscribe(() => {
// console.log('%c [OBSERVABLE]', 'color: red;');
obs.next({ response: { data: ['foo', 'bar'] } });
// Stop receiving values!
obs.complete();
})
return () => {
console.warn("I've had enough values!");
}
});
// Composing interceptors the chain
const obsI1$ = obsBE$
.pipe(
tap(() => console.log('%c [i1]', 'color: blue;')),
map(r => ({ ...r, i1: 'intercepted by i1!' }))
);
let retryCnt = 0;
const obsI2$ = obsI1$
.pipe(
tap(() => console.log('%c [i2]', 'color: green;')),
map(r => {
if (++retryCnt <=3) {
throw new Error('err!')
}
return r;
}),
catchError((err, caught) => {
return getRefreshToken()
.pipe(
switchMap(() => /* obsI2$ */caught),
)
})
);
const obsI3$ = obsI2$
.pipe(
tap(() => console.log('%c [i3]', 'color: orange;')),
map(r => ({ ...r, i3: 'intercepted by i3!' }))
);
function getRefreshToken () {
return timer(1500)
.pipe(q
map(() => ({ token: 'TOKEN HERE' })),
);
}
function get () {
return obsI3$
}
get()
.subscribe(console.log)
/*
-->
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
[i3]
{
"response": {
"data": [
"foo",
"bar"
]
},
"i1": "intercepted by i1!",
"i3": "intercepted by i3!"
}
I've had enough values!
*/
要点是拦截器创建某种链,它以负责发出实际请求的可观察对象结束。 This 是链中的最后一个节点:
return new Observable((observer: Observer<HttpEvent<any>>) => {
// Start by setting up the XHR object with request method, URL, and withCredentials flag.
const xhr = this.xhrFactory.build();
xhr.open(req.method, req.urlWithParams);
if (!!req.withCredentials) {
xhr.withCredentials = true;
}
/* ... */
})
how can I return an observable and maintain a queue at the http interceptor at the same time
我认为解决这个问题的方法是创建一个包含队列逻辑的拦截器,并将其 intercept
方法 return 设为 Observable
,以便它可以被订阅至:
const queueSubject = new Subject<Observable>();
const pendingQueue$ = queueSubject.pipe(
// using `mergeAll` because the Subject's `values` are Observables
mergeAll(limit),
share(),
);
intercept (req, next) {
// `next.handle(req)` - it's fine to do this, no request will fire until the observable is subscribed
queueSubject.next(
next.handle(req)
.pipe(
// not interested in `Sent` events
filter(ev => ev instanceof HttpResponse),
filter(resp => resp.url === req.url),
)
);
return pendingQueue$;
}
使用 filter
运算符是因为通过使用 share
,响应将发送给所有订阅者。假设您同步调用 http.get
5 次,因此 share
的 Subject 有 5 个新订阅者,最后一个将收到其响应,但也会收到其他请求的响应。因此,可以使用 filter
来为请求提供正确的响应,在这种情况下,通过将请求 (req.url
) 的 URL 与我们从中获得的 URL 进行比较HttpResponse.url
:
observer.next(new HttpResponse({
body,
headers,
status,
statusText,
url: url || undefined,
}));
现在,我们为什么要使用 share()
?
我们先看一个更简单的例子:
const s = new Subject();
const queue$ = s.pipe(
mergeAll()
)
function intercept (req) {
s.next(of(req));
return queue$
}
// making request 1
intercept({ url: 'req 1' }).subscribe();
// making request 2
intercept({ url: 'req 2' }).subscribe();
// making request 3
intercept({ url: 'req 3' }).subscribe();
此时,主题 s
应该有 3 个订阅者。这是因为当您 return 排队时,您 return s.pipe(...)
并且当您 订阅 时,它与做的相同:
s.pipe(/* ... */).subscribe()
所以,这就是为什么该主题最后会有 3 个订阅者。
现在让我们检查同一个片段,但是 share()
:
const queue$ = s.pipe(
mergeAll(),
share()
);
// making request 1
intercept({ url: 'req 1' }).subscribe();
// making request 2
intercept({ url: 'req 2' }).subscribe();
// making request 3
intercept({ url: 'req 3' }).subscribe();
你订阅请求1后,share
会创建一个Subject实例,之后所有的订阅者都会属于它,而不是属于main Subject s
。因此,s
将只有一个订阅者。这将确保我们正确地实现队列,因为尽管 Subject s
只有一个订阅者,它仍然会接受 s.next()
值,其结果将传递给另一个 subject(那个来自 share()
),它最终会将响应发送给它的所有订阅者。
我有一个需要限制传出 http 请求数量的用例。是的,我在服务器端确实有速率限制器,但前端也需要限制活动 http 请求的数量 too.For 这就是我尝试实现滑动 window 协议的原因在任何时候我都只会有 n 个活动请求。
这种使用 Rxjs 的方法通常效果很好,请参见此处: https://jsbin.com/pacicubeci/1/edit?js,console,output
但我不清楚如何使用与 http 拦截器相同的逻辑。我的以下尝试在编译时失败并出现以下错误:
类型 'Subscription' 缺少类型 'Observable<HttpEvent>' 的以下属性:_isScalar、source、operator、lift 和 114 个以上。(2740)
这样,我如何才能 return 一个可观察对象并同时在 http 拦截器上维护一个队列?我的方法有缺陷吗?我可以使用 http 拦截器来限制 http 速率吗?
@Injectable()
export class I1 implements HttpInterceptor {
intercept(
req: HttpRequest<any>,
next: HttpHandler
): Observable<HttpEvent<any>> {
const modified = req.clone({ setHeaders: { "Custom-Header-1": "1" } });
return next
.handle(req)
.do((ev: HttpEvent<any>) => {
if (ev instanceof HttpResponse) {
console.log(ev);
}
})
.pipe(
bufferTime(1000, null, 1),
filter(buffer => buffer.length > 0),
concatMap(buffer => of(buffer).pipe(delay(1000)))
)
.subscribe(console.log);
}
}
https://stackblitz.com/edit/angular-interceptors-npqkjp?file=app/interceptors.ts
在您的拦截器上,您返回的是订阅,而不是 Observable。
如果删除 .subscribe(console.log)
行,它应该可以正常编译。订阅由消费者完成。
如果你想 console.log 发射的所有东西,使用 tap(next => ...)
运算符
编辑 - 嗯,它解决了编译错误,但我不确定它是否会如您所愿...我不完全了解拦截器的工作原理。
如果您想了解更多关于拦截器和 HttpClientModule 的工作原理,您可以查看这篇文章:Exploring the HttpClientModule in Angular.
Is my approach flawed? In this case, the problem is that
next.handle
is expected to return an Observable, but by subscribing to it, it returns a Subscription.
为了更好地理解原因,我将粘贴从上面链接的文章中复制的片段:
const obsBE$ = new Observable(obs => {
timer(1000)
.subscribe(() => {
// console.log('%c [OBSERVABLE]', 'color: red;');
obs.next({ response: { data: ['foo', 'bar'] } });
// Stop receiving values!
obs.complete();
})
return () => {
console.warn("I've had enough values!");
}
});
// Composing interceptors the chain
const obsI1$ = obsBE$
.pipe(
tap(() => console.log('%c [i1]', 'color: blue;')),
map(r => ({ ...r, i1: 'intercepted by i1!' }))
);
let retryCnt = 0;
const obsI2$ = obsI1$
.pipe(
tap(() => console.log('%c [i2]', 'color: green;')),
map(r => {
if (++retryCnt <=3) {
throw new Error('err!')
}
return r;
}),
catchError((err, caught) => {
return getRefreshToken()
.pipe(
switchMap(() => /* obsI2$ */caught),
)
})
);
const obsI3$ = obsI2$
.pipe(
tap(() => console.log('%c [i3]', 'color: orange;')),
map(r => ({ ...r, i3: 'intercepted by i3!' }))
);
function getRefreshToken () {
return timer(1500)
.pipe(q
map(() => ({ token: 'TOKEN HERE' })),
);
}
function get () {
return obsI3$
}
get()
.subscribe(console.log)
/*
-->
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
[i3]
{
"response": {
"data": [
"foo",
"bar"
]
},
"i1": "intercepted by i1!",
"i3": "intercepted by i3!"
}
I've had enough values!
*/
要点是拦截器创建某种链,它以负责发出实际请求的可观察对象结束。 This 是链中的最后一个节点:
return new Observable((observer: Observer<HttpEvent<any>>) => {
// Start by setting up the XHR object with request method, URL, and withCredentials flag.
const xhr = this.xhrFactory.build();
xhr.open(req.method, req.urlWithParams);
if (!!req.withCredentials) {
xhr.withCredentials = true;
}
/* ... */
})
how can I return an observable and maintain a queue at the http interceptor at the same time
我认为解决这个问题的方法是创建一个包含队列逻辑的拦截器,并将其 intercept
方法 return 设为 Observable
,以便它可以被订阅至:
const queueSubject = new Subject<Observable>();
const pendingQueue$ = queueSubject.pipe(
// using `mergeAll` because the Subject's `values` are Observables
mergeAll(limit),
share(),
);
intercept (req, next) {
// `next.handle(req)` - it's fine to do this, no request will fire until the observable is subscribed
queueSubject.next(
next.handle(req)
.pipe(
// not interested in `Sent` events
filter(ev => ev instanceof HttpResponse),
filter(resp => resp.url === req.url),
)
);
return pendingQueue$;
}
使用 filter
运算符是因为通过使用 share
,响应将发送给所有订阅者。假设您同步调用 http.get
5 次,因此 share
的 Subject 有 5 个新订阅者,最后一个将收到其响应,但也会收到其他请求的响应。因此,可以使用 filter
来为请求提供正确的响应,在这种情况下,通过将请求 (req.url
) 的 URL 与我们从中获得的 URL 进行比较HttpResponse.url
:
observer.next(new HttpResponse({
body,
headers,
status,
statusText,
url: url || undefined,
}));
现在,我们为什么要使用 share()
?
我们先看一个更简单的例子:
const s = new Subject();
const queue$ = s.pipe(
mergeAll()
)
function intercept (req) {
s.next(of(req));
return queue$
}
// making request 1
intercept({ url: 'req 1' }).subscribe();
// making request 2
intercept({ url: 'req 2' }).subscribe();
// making request 3
intercept({ url: 'req 3' }).subscribe();
此时,主题 s
应该有 3 个订阅者。这是因为当您 return 排队时,您 return s.pipe(...)
并且当您 订阅 时,它与做的相同:
s.pipe(/* ... */).subscribe()
所以,这就是为什么该主题最后会有 3 个订阅者。
现在让我们检查同一个片段,但是 share()
:
const queue$ = s.pipe(
mergeAll(),
share()
);
// making request 1
intercept({ url: 'req 1' }).subscribe();
// making request 2
intercept({ url: 'req 2' }).subscribe();
// making request 3
intercept({ url: 'req 3' }).subscribe();
你订阅请求1后,share
会创建一个Subject实例,之后所有的订阅者都会属于它,而不是属于main Subject s
。因此,s
将只有一个订阅者。这将确保我们正确地实现队列,因为尽管 Subject s
只有一个订阅者,它仍然会接受 s.next()
值,其结果将传递给另一个 subject(那个来自 share()
),它最终会将响应发送给它的所有订阅者。