如何成对缓冲可观察对象并成对执行它们?

How to buffer observables in pairs and execute them by the pair?

我有一个 xhr 请求正在获取一个数组,我用它来执行后续的 xhr 请求,如下所示:

const Rx = require('rxjs/Rx');
const fetch = require('node-fetch');

const url = `url`;

// Get array of tables
const tables$ = Rx.Observable
  .from(fetch(url).then((r) => r.json()));

// Get array of columns
const columns$ = (table) => {
  return Rx.Observable
    .from(fetch(`${url}/${table.TableName}/columns`).then(r => r.json()));
};

tables$
  .mergeMap(tables => Rx.Observable.forkJoin(...tables.map(columns$)))    
  .subscribe(val => console.log(val));

我想分块执行列请求,这样请求就不会立即发送到服务器。

这个 SO 问题有些方向相同但不完全相同:

现在我正在尝试这样的事情:

tables$
  .mergeMap(tables => Rx.Observable.forkJoin(...tables.map(columns$)))
  .flatMap(e => e)
  .bufferCount(4)
  .executeTheChunksSerial(magic)
  .flatMap(e => e)
  .subscribe(val => console.log(val));

但我无法全神贯注地思考如何串行执行块...

您可以利用 mergeMapconcurrency 参数来让最大 x 请求并发到您的服务器:

const getTables = Promise.resolve([{ tableName: 'foo' },{ tableName: 'bar' },{ tableName: 'baz' }]);
const getColumns = (table) => Rx.Observable.of('a,b,c')
  .do(_ => console.log('getting columns for table: ' + table))
  .delay(250);
      
Rx.Observable.from(getTables)
  .mergeAll()
  .mergeMap(
    table => getColumns(table.tableName),
    (table, columns) => ({ table, columns }),
    2)
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.js"></script>