RxJS:保留输入顺序的 MergeMap

RxJS: MergeMap with Preserving Input order

要求:

urls = [url1, url2, url3]

并行触发所有 3 个网址并在网址列表的序列中绘制 Dom

 ex: Finished order of urls = [url3, url1, url2]
     when url1 finishes Immediately render the DOM, without waiting for url2
     If url2, url3 finishes before url1, then store url2, url3 and paint the DOM after url1 arrives
     Paint the DOM with order [url1, url2, url3]

我使用 promises 的工作:

// Fired all 3 urls at the same time
p1 = fetch(url1)
p2 = fetch(url2)
p3 = fetch(url3)

p1.then(updateDom)
  .then(() => p2)
  .then(updateDom)
  .then(() => p3)
  .then(updateDom)

我想在 Observables 中做同样的事情。

from(urls)
  .pipe(
      mergeMap(x => fetch(x))
  )

为了并行触发它们,我使用了合并映射,但我如何排序结果的顺序?

你可以用 fetch 和 paint 形成一个序列,然后 forkJoin/Promise.all them

p1 = fetch(url1)
p2 = fetch(url2)
p3 = fetch(url3)

forkJoin(
from(p1).pipe(tap(_=>paint dom...))
from(p1).pipe(tap(_=>paint dom...))
from(p1).pipe(tap(_=>paint dom...))
).subscribe()

我找不到任何保留顺序的东西,所以我想出了一些有点复杂的东西。

const { concat, of, BehaviorSubject, Subject } = rxjs;
const { delay, filter } = rxjs.operators;

const parallelExecute = (...obs$) => {
  const subjects = obs$.map(o$ => {
    const subject$ = new BehaviorSubject();
    const sub = o$.subscribe(o => { subject$.next(o); });
    return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
  });
  const subject$ = new Subject();
  sub(0);
  function sub(index) {
    const current = subjects[index];
    current.obs$.subscribe(c => {
      subject$.next(c);
      current.obs$.complete();
      current.sub.unsubscribe();
      if (index < subjects.length -1) {
        sub(index + 1);
      } else {
        subject$.complete();
      }
    });
  }
  return subject$;
}


parallelExecute(
  of(1).pipe(delay(3000)),
  of(2).pipe(delay(2000)),
  of(3).pipe(delay(1000))
).subscribe(result => { console.log(result); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

使用这样的异步任务保持顺序的最佳方法是 concatMap

问题是,如果我们单独应用它,就会失去并行化。如果我们要这样做:

from(urls)
  .pipe(
      concatMap(x => fetch(x))
  );

第一个请求完成后才会触发第二个请求。

我们可以通过将映射分离到它自己的运算符中来解决这个问题:

from(urls)
  .pipe(
      map(x => fetch(x)),
      concatMap(x => x)
  );

请求将同时触发,但结果将按请求顺序发出。

请参阅下面的 适用于使用此方法的示例:

const { from } = rxjs;
const { concatMap, map } = rxjs.operators;

function delayPromise(value, delay) {
  return new Promise(resolve => setTimeout(() => resolve(value), delay));
}

var delay = 3;

from([1, 2, 3]).pipe(
  map(x => delayPromise(x, delay-- * 1000)),
  concatMap(x => x)
).subscribe(result => { console.log(result); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>