RXJS如何延迟每个可观察的(http请求)并组合所有请求输出
RXJS how to delay each observable (http request) and combine all requests output
我在设置一个 observable 中的 http 请求之间的间隔时遇到问题。
事情是 - 我想在每个 http 请求之间有延迟,等待所有请求完成,并对所有请求的组合数据执行操作。
当然文档数组的长度是未知的。
示例代码:
const documents = [
{
'documentId': 1,
'documentName': 'DocName1'
},
{
'documentId': 2,
'documentName': 'DocName2'
},
];
function saveDocumentService(document) {
document['someAnotherData'] = '123';
return of(document); // in real world http.post()
}
const documentsToSave$ = zip(
documents,
interval(200),
document => {
document['someDataToBeInserted'] = {'data': 123};
return saveDocumentService(document);
}
);
const sub = forkJoin(documentsToSave$).subscribe(documents => {
console.log('All documents uploaded', documents); // array of responses
});
使用这种方法,只输出最后一个值。
谢谢。
您可以使用 rxjs
pipe
运算符将 take
和 interval
组合为
counter: number = 0;
items: string[] = ["one", "tow", "three", "four"];
ngOnInit() {
interval(2000)
.pipe(take(this.items.length))
.subscribe(res => {
console.log(console.log(this.items[this.counter++]));
});
}
编辑: 您需要使用 Observable.create
创建一个 Observable 流,您可以在其中定期使用 setInterval
创建 observable,最后,您可以将其标记为完成。
订阅后,我们将在发出新值和将可观察对象标记为完成后执行单独的方法
obs: Observable<any>;
counter: number = 0;
items: string[] = ["one", "two", "three", "four"];
ngOnInit() {
this.obs = Observable.create(observer => {
let intervalID = setInterval(() => {
observer.next(this.items[this.counter++]);
if (this.counter >= this.items.length) {
clearInterval(intervalID);
observer.complete();
}
}, 1000);
});
this.obs.subscribe(
res => {
console.log(res);
},
err => {
console.log(`Error: ${err}`);
},
() => {
console.log("complete");
}
);
}
Stackblitz 网址:https://stackblitz.com/edit/angular-regular-interval-observables-with-complete
尝试使用 combineLatest
。但请注意,在每个可观察对象至少发出一个值之前,combineLatest 不会发出初始值。您可以查看文档以获取更多信息:
combineLatest
您可以将 merge
与计时器一起使用并忽略所述计时器的输出:
import { of, from, timer } from 'rxjs'
import { concatMap, merge, ignoreElements, tap, toArray } from 'rxjs/operators'
const documents = [
{
'documentId': 1,
'documentName': 'DocName1'
},
{
'documentId': 2,
'documentName': 'DocName2'
},
];
function saveDocumentService(document) {
document['someAnotherData'] = '123';
return of(document)
}
from(documents)
.pipe(
concatMap(url => saveDocumentService(url).pipe(
tap(res => console.log('Saved document...')),
merge(timer(1000).pipe(ignoreElements()))
)),
toArray(),
)
.subscribe(documents => {
console.log('Sub:', documents)
})
因为你有 documentsToSave$
类型 Observable<Observable<any>>
将 forkJoin
替换为 mergeAll
和 toArray
const sub = documentsToSave$.pipe(mergeAll(), toArray()).subscribe(documents => {
console.log('All documents uploaded', documents); // array of responses
});
}
我在设置一个 observable 中的 http 请求之间的间隔时遇到问题。 事情是 - 我想在每个 http 请求之间有延迟,等待所有请求完成,并对所有请求的组合数据执行操作。 当然文档数组的长度是未知的。
示例代码:
const documents = [
{
'documentId': 1,
'documentName': 'DocName1'
},
{
'documentId': 2,
'documentName': 'DocName2'
},
];
function saveDocumentService(document) {
document['someAnotherData'] = '123';
return of(document); // in real world http.post()
}
const documentsToSave$ = zip(
documents,
interval(200),
document => {
document['someDataToBeInserted'] = {'data': 123};
return saveDocumentService(document);
}
);
const sub = forkJoin(documentsToSave$).subscribe(documents => {
console.log('All documents uploaded', documents); // array of responses
});
使用这种方法,只输出最后一个值。
谢谢。
您可以使用 rxjs
pipe
运算符将 take
和 interval
组合为
counter: number = 0;
items: string[] = ["one", "tow", "three", "four"];
ngOnInit() {
interval(2000)
.pipe(take(this.items.length))
.subscribe(res => {
console.log(console.log(this.items[this.counter++]));
});
}
编辑: 您需要使用 Observable.create
创建一个 Observable 流,您可以在其中定期使用 setInterval
创建 observable,最后,您可以将其标记为完成。
订阅后,我们将在发出新值和将可观察对象标记为完成后执行单独的方法
obs: Observable<any>;
counter: number = 0;
items: string[] = ["one", "two", "three", "four"];
ngOnInit() {
this.obs = Observable.create(observer => {
let intervalID = setInterval(() => {
observer.next(this.items[this.counter++]);
if (this.counter >= this.items.length) {
clearInterval(intervalID);
observer.complete();
}
}, 1000);
});
this.obs.subscribe(
res => {
console.log(res);
},
err => {
console.log(`Error: ${err}`);
},
() => {
console.log("complete");
}
);
}
Stackblitz 网址:https://stackblitz.com/edit/angular-regular-interval-observables-with-complete
尝试使用 combineLatest
。但请注意,在每个可观察对象至少发出一个值之前,combineLatest 不会发出初始值。您可以查看文档以获取更多信息:
combineLatest
您可以将 merge
与计时器一起使用并忽略所述计时器的输出:
import { of, from, timer } from 'rxjs'
import { concatMap, merge, ignoreElements, tap, toArray } from 'rxjs/operators'
const documents = [
{
'documentId': 1,
'documentName': 'DocName1'
},
{
'documentId': 2,
'documentName': 'DocName2'
},
];
function saveDocumentService(document) {
document['someAnotherData'] = '123';
return of(document)
}
from(documents)
.pipe(
concatMap(url => saveDocumentService(url).pipe(
tap(res => console.log('Saved document...')),
merge(timer(1000).pipe(ignoreElements()))
)),
toArray(),
)
.subscribe(documents => {
console.log('Sub:', documents)
})
因为你有 documentsToSave$
类型 Observable<Observable<any>>
将 forkJoin
替换为 mergeAll
和 toArray
const sub = documentsToSave$.pipe(mergeAll(), toArray()).subscribe(documents => {
console.log('All documents uploaded', documents); // array of responses
});
}