如何使用 Rxjs 5 限制服务器请求?
How to throttle a server request with Rxjs 5?
我想创建一个函数来在服务器中发出 HTTP PUT 请求,但如果该函数在该时间间隔内被多次调用,则每 500 毫秒仅使用最后一次调用参数一次,如果最后一次请求仍在,则取消最后一次请求进步。
我研究并提出了这个解决方案:
const { Observable } = require('rxjs/Observable')
const { Subject } = require('rxjs/Subject')
const { switchMap, auditTime } = require('rxjs/operators')
// Simulate HTTP request
function makeRequest (val) {
return Observable.create(observer => {
console.log('Request:', val);
observer.next(val);
observer.complete();
});
}
const toUpdateStream = new Subject();
const notifier$ = toUpdateStream.pipe(
auditTime(500),
switchMap(val => makeRequest(val))
);
function updateThrottle (val) {
return Observable.create(observer => {
const lastUpdate$ = notifier$.subscribe(res => {
observer.next(res);
observer.complete();
lastUpdate$.unsubscribe();
});
toUpdateStream.next(val);
});
}
// Try to update 3 times with different parameters
updateThrottle(10).subscribe(val => { console.log('1:', val); });
updateThrottle(20).subscribe(val => { console.log('2:', val); });
updateThrottle(30).subscribe(val => { console.log('3:', val); });
输出为:
Request: 30
1: 30
Request: 30
2: 30
Request: 30
3: 30
问题是我只需要用 30 调用请求一次,而不是每次都调用。
我能做什么?
所以你想将值传递给 auditTime
只有当它们从上一次发射发生变化时。 pairwise
运算符适用于此用例。
const notifier$ = toUpdateStream.pipe(
startWith(null),
pairwise(), // Emit the previous and current search options
filter(([oldSearch, newSearch]) => oldSearch !== newSearch),
map(([oldSearch, newSearch]) => newSearch),
auditTime(500),
switchMap(val => makeRequest(val))
);
你也可以使用 bufferCount(2, 1)
而不是 pairwise()
,它会做同样的事情。
在尝试了一些东西之后,我得到了这个最终解决方案:
const { Observable } = require('rxjs/Observable')
const { Subject } = require('rxjs/Subject')
const { auditTime, switchMap } = require('rxjs/operators')
// Simulate log with timestamp
const log = msg => {
const d = new Date();
console.log(d.getSeconds() +'.'+ d.getMilliseconds() +' - '+ msg);
}
// Simulate HTTP request that takes 750ms
function makeRequest (val) {
return Observable.create(observer => {
const timeout = setTimeout(() => {
log('Request: '+ val);
observer.next('R'+ val); // Mock the HTTP response
observer.complete();
}, 750);
return () => clearTimeout(timeout);
});
}
const toUpdateStream$ = new Subject();
const updatedStream$ = new Subject();
const filter$ = toUpdateStream$.pipe(
auditTime(500),
switchMap(val => makeRequest(val))
);
filter$.subscribe(val => updatedStream$.next(val));
function updateThrottle (val) {
return Observable.create(observer => {
const lastUpdate = updatedStream$.subscribe(res => {
observer.next(res);
observer.complete();
lastUpdate.unsubscribe();
});
toUpdateStream$.next(val);
});
}
log('Start');
// Try 3 requests and the last one (the 30) gets processed
updateThrottle(10).subscribe(val => log('1: '+ val));
updateThrottle(20).subscribe(val => log('2: '+ val));
updateThrottle(30).subscribe(val => log('3: '+ val));
// Try to make more requests when the current one isn't finished
setTimeout(() => {
// This one cancels the last one
updateThrottle(40).subscribe(val => log('4: '+ val));
updateThrottle(50).subscribe(val => log('5: '+ val));
updateThrottle(60).subscribe(val => log('6: '+ val)); // This gets processed
}, 600);
和日志:
43.394 - Start
45.275 - Request: 60
45.277 - 1: R60
45.277 - 2: R60
45.278 - 3: R60
45.278 - 4: R60
45.278 - 5: R60
45.278 - 6: R60
只应使用最后一个参数发出一个请求,return 向侦听器发出请求。
我想创建一个函数来在服务器中发出 HTTP PUT 请求,但如果该函数在该时间间隔内被多次调用,则每 500 毫秒仅使用最后一次调用参数一次,如果最后一次请求仍在,则取消最后一次请求进步。
我研究并提出了这个解决方案:
const { Observable } = require('rxjs/Observable')
const { Subject } = require('rxjs/Subject')
const { switchMap, auditTime } = require('rxjs/operators')
// Simulate HTTP request
function makeRequest (val) {
return Observable.create(observer => {
console.log('Request:', val);
observer.next(val);
observer.complete();
});
}
const toUpdateStream = new Subject();
const notifier$ = toUpdateStream.pipe(
auditTime(500),
switchMap(val => makeRequest(val))
);
function updateThrottle (val) {
return Observable.create(observer => {
const lastUpdate$ = notifier$.subscribe(res => {
observer.next(res);
observer.complete();
lastUpdate$.unsubscribe();
});
toUpdateStream.next(val);
});
}
// Try to update 3 times with different parameters
updateThrottle(10).subscribe(val => { console.log('1:', val); });
updateThrottle(20).subscribe(val => { console.log('2:', val); });
updateThrottle(30).subscribe(val => { console.log('3:', val); });
输出为:
Request: 30
1: 30
Request: 30
2: 30
Request: 30
3: 30
问题是我只需要用 30 调用请求一次,而不是每次都调用。
我能做什么?
所以你想将值传递给 auditTime
只有当它们从上一次发射发生变化时。 pairwise
运算符适用于此用例。
const notifier$ = toUpdateStream.pipe(
startWith(null),
pairwise(), // Emit the previous and current search options
filter(([oldSearch, newSearch]) => oldSearch !== newSearch),
map(([oldSearch, newSearch]) => newSearch),
auditTime(500),
switchMap(val => makeRequest(val))
);
你也可以使用 bufferCount(2, 1)
而不是 pairwise()
,它会做同样的事情。
在尝试了一些东西之后,我得到了这个最终解决方案:
const { Observable } = require('rxjs/Observable')
const { Subject } = require('rxjs/Subject')
const { auditTime, switchMap } = require('rxjs/operators')
// Simulate log with timestamp
const log = msg => {
const d = new Date();
console.log(d.getSeconds() +'.'+ d.getMilliseconds() +' - '+ msg);
}
// Simulate HTTP request that takes 750ms
function makeRequest (val) {
return Observable.create(observer => {
const timeout = setTimeout(() => {
log('Request: '+ val);
observer.next('R'+ val); // Mock the HTTP response
observer.complete();
}, 750);
return () => clearTimeout(timeout);
});
}
const toUpdateStream$ = new Subject();
const updatedStream$ = new Subject();
const filter$ = toUpdateStream$.pipe(
auditTime(500),
switchMap(val => makeRequest(val))
);
filter$.subscribe(val => updatedStream$.next(val));
function updateThrottle (val) {
return Observable.create(observer => {
const lastUpdate = updatedStream$.subscribe(res => {
observer.next(res);
observer.complete();
lastUpdate.unsubscribe();
});
toUpdateStream$.next(val);
});
}
log('Start');
// Try 3 requests and the last one (the 30) gets processed
updateThrottle(10).subscribe(val => log('1: '+ val));
updateThrottle(20).subscribe(val => log('2: '+ val));
updateThrottle(30).subscribe(val => log('3: '+ val));
// Try to make more requests when the current one isn't finished
setTimeout(() => {
// This one cancels the last one
updateThrottle(40).subscribe(val => log('4: '+ val));
updateThrottle(50).subscribe(val => log('5: '+ val));
updateThrottle(60).subscribe(val => log('6: '+ val)); // This gets processed
}, 600);
和日志:
43.394 - Start
45.275 - Request: 60
45.277 - 1: R60
45.277 - 2: R60
45.278 - 3: R60
45.278 - 4: R60
45.278 - 5: R60
45.278 - 6: R60
只应使用最后一个参数发出一个请求,return 向侦听器发出请求。