RXJS如何延迟每个可观察的(http请求)并组合所有请求输出

RXJS how to delay each observable (http request) and combine all requests output

我在设置一个 observable 中的 http 请求之间的间隔时遇到问题。 事情是 - 我想在每个 http 请求之间有延迟,等待所有请求完成,并对所有请求的组合数据执行操作。 当然文档数组的长度是未知的。

示例代码:

const documents = [
  {
    'documentId': 1,
    'documentName': 'DocName1'
  },
  {
    'documentId': 2,
    'documentName': 'DocName2'
  },
];

function saveDocumentService(document) {
  document['someAnotherData'] = '123';

  return of(document); // in real world http.post()
}

const documentsToSave$ = zip(
  documents,
  interval(200),
  document => {
    document['someDataToBeInserted'] = {'data': 123};
    return saveDocumentService(document);
  }
);

const sub = forkJoin(documentsToSave$).subscribe(documents => {
  console.log('All documents uploaded', documents); // array of responses
});

使用这种方法,只输出最后一个值。

谢谢。

您可以使用 rxjs pipe 运算符将 takeinterval 组合为

counter: number = 0;
items: string[] = ["one", "tow", "three", "four"];

  ngOnInit() {
    interval(2000)
      .pipe(take(this.items.length))
      .subscribe(res => {
        console.log(console.log(this.items[this.counter++]));
      });
  }

编辑: 您需要使用 Observable.create 创建一个 Observable 流,您可以在其中定期使用 setInterval 创建 observable,最后,您可以将其标记为完成。

订阅后,我们将在发出新值和将可观察对象标记为完成后执行单独的方法

obs: Observable<any>;
counter: number = 0;
items: string[] = ["one", "two", "three", "four"];



ngOnInit() {
    this.obs = Observable.create(observer => {
      let intervalID = setInterval(() => {
        observer.next(this.items[this.counter++]);
        if (this.counter >= this.items.length) {
          clearInterval(intervalID);
          observer.complete();
        }
      }, 1000);
    });

    this.obs.subscribe(
      res => {
        console.log(res);
      },
      err => {
        console.log(`Error: ${err}`);
      },
      () => {
        console.log("complete");
      }
    );
  }

Stackblitz 网址:https://stackblitz.com/edit/angular-regular-interval-observables-with-complete

尝试使用 combineLatest。但请注意,在每个可观察对象至少发出一个值之前,combineLatest 不会发出初始值。您可以查看文档以获取更多信息: combineLatest

您可以将 merge 与计时器一起使用并忽略所述计时器的输出:

import { of, from, timer } from 'rxjs'
import { concatMap, merge, ignoreElements, tap, toArray } from 'rxjs/operators'

const documents = [
  {
    'documentId': 1,
    'documentName': 'DocName1'
  },
  {
    'documentId': 2,
    'documentName': 'DocName2'
  },
];

function saveDocumentService(document) {
  document['someAnotherData'] = '123';
  return of(document)
}
from(documents)
  .pipe(
    concatMap(url => saveDocumentService(url).pipe(
      tap(res => console.log('Saved document...')),
      merge(timer(1000).pipe(ignoreElements()))
    )),
    toArray(),
  )
  .subscribe(documents => {
    console.log('Sub:', documents)
  })

Stackblitz

因为你有 documentsToSave$ 类型 Observable<Observable<any>>forkJoin 替换为 mergeAlltoArray

const sub = documentsToSave$.pipe(mergeAll(), toArray()).subscribe(documents => {
  console.log('All documents uploaded', documents); // array of responses
});
  }

here is working example