如何使用 Rxjs 5 限制服务器请求?

How to throttle a server request with Rxjs 5?

我想创建一个函数来在服务器中发出 HTTP PUT 请求,但如果该函数在该时间间隔内被多次调用,则每 500 毫秒仅使用最后一次调用参数一次,如果最后一次请求仍在,则取消最后一次请求进步。

我研究并提出了这个解决方案:

const { Observable } = require('rxjs/Observable')
const { Subject } = require('rxjs/Subject')
const { switchMap, auditTime } = require('rxjs/operators')

// Simulate HTTP request
function makeRequest (val) {
  return Observable.create(observer => {
    console.log('Request:', val);
    observer.next(val);
    observer.complete();
  });
}

const toUpdateStream = new Subject();
const notifier$ = toUpdateStream.pipe(
  auditTime(500),
  switchMap(val => makeRequest(val))
);


function updateThrottle (val) {
  return Observable.create(observer => {
    const lastUpdate$ = notifier$.subscribe(res => {
      observer.next(res);
      observer.complete();
      lastUpdate$.unsubscribe();
    });
    toUpdateStream.next(val);
  });
}

// Try to update 3 times with different parameters
updateThrottle(10).subscribe(val => { console.log('1:', val); });
updateThrottle(20).subscribe(val => { console.log('2:', val); });
updateThrottle(30).subscribe(val => { console.log('3:', val); });

输出为:

Request: 30
1: 30
Request: 30
2: 30
Request: 30
3: 30

问题是我只需要用 30 调用请求一次,而不是每次都调用。

我能做什么?

所以你想将值传递给 auditTime 只有当它们从上一次发射发生变化时。 pairwise 运算符适用于此用例。

const notifier$ = toUpdateStream.pipe(
  startWith(null),
  pairwise(), // Emit the previous and current search options
  filter(([oldSearch, newSearch]) => oldSearch !== newSearch),
  map(([oldSearch, newSearch]) => newSearch),
  auditTime(500),
  switchMap(val => makeRequest(val))
);

你也可以使用 bufferCount(2, 1) 而不是 pairwise(),它会做同样的事情。

在尝试了一些东西之后,我得到了这个最终解决方案:

const { Observable } = require('rxjs/Observable')
const { Subject } = require('rxjs/Subject')
const { auditTime, switchMap } = require('rxjs/operators')

// Simulate log with timestamp
const log = msg => {
  const d = new Date();
  console.log(d.getSeconds() +'.'+ d.getMilliseconds() +' - '+ msg);
}

// Simulate HTTP request that takes 750ms
function makeRequest (val) {
  return Observable.create(observer => {
    const timeout = setTimeout(() => {
      log('Request: '+ val);
      observer.next('R'+ val);  // Mock the HTTP response
      observer.complete();
    }, 750);
    return () => clearTimeout(timeout);
  });
}

const toUpdateStream$ = new Subject();
const updatedStream$ = new Subject();
const filter$ = toUpdateStream$.pipe(
  auditTime(500),
  switchMap(val => makeRequest(val))
);
filter$.subscribe(val => updatedStream$.next(val));

function updateThrottle (val) {
  return Observable.create(observer => {
    const lastUpdate = updatedStream$.subscribe(res => {
      observer.next(res);
      observer.complete();
      lastUpdate.unsubscribe();
    });
    toUpdateStream$.next(val);
  });
}

log('Start');

// Try 3 requests and the last one (the 30) gets processed
updateThrottle(10).subscribe(val => log('1: '+ val));
updateThrottle(20).subscribe(val => log('2: '+ val));
updateThrottle(30).subscribe(val => log('3: '+ val));

// Try to make more requests when the current one isn't finished
setTimeout(() => {
  // This one cancels the last one
  updateThrottle(40).subscribe(val => log('4: '+ val));
  updateThrottle(50).subscribe(val => log('5: '+ val));
  updateThrottle(60).subscribe(val => log('6: '+ val)); // This gets processed
}, 600);

和日志:

43.394 - Start
45.275 - Request: 60
45.277 - 1: R60
45.277 - 2: R60
45.278 - 3: R60
45.278 - 4: R60
45.278 - 5: R60
45.278 - 6: R60

只应使用最后一个参数发出一个请求,return 向侦听器发出请求。