如何构建一个基于先前结果调用多个可观察对象的可观察对象?
How to build an observable which calls multiple observables based on previous result?
我有一个端点 returns { ids: [1, 2, 3, 45] }
。
另一个 returns 给定 id 的值:{ id: 3, value: 30, active: true }
。
我正在尝试构建一个调用第一个端点的可观察对象,并为每个返回的 id 调用第二个端点并发出所有 active = true
值的总和:
private getIds(group: string) {
const url = '...';
return this.http.get<{ ids: number[] }>(url, { params: { group } });
}
private getValue(id: number) {
const url = '...';
return this.http.get<ActiveValue>(url, { params: { id: id.toString() } });
}
public getSum(group: string) {
let sum = 0;
const bSubject = new BehaviorSubject(sum);
const observables = this.getIds(group).pipe(
mergeMap(({ ids }) => ids),
map(id => this.getValue(id).pipe(tap(({ value, active }) => {
if (active) {
sum += value;
bSubject.next(sum);
}
})))
);
const observable = forkJoin(observables).pipe(map(() => sum));
return { bSubject, observable };
}
interface ActiveValue {
value: number;
active: boolean;
}
但它抱怨:
forkJoin is deprecated: Use the version that takes an array of Observables instead (deprecation)
此外,当我将鼠标悬停在 observables
上时,它显示:
const observables: Observable<Observable<ActiveValue>>
...虽然我认为应该是 Observable<ActiveValue>[]
我怎样才能让它发挥作用?
我不确定,但你可以试试这样的方法
interface ActiveValue {
value: number;
active: boolean;
}
function countActiveValues(values: ActiveVale[]) {
return values.reduce((acc, { value, active }) => acc + active ? value : 0, 0)
}
class MyClass {
private getIds(group: string) {
const url = '...';
return this.http.get < { ids: number[] } > (url, { params: { group } });
}
private getValue(id: number) {
const url = '...';
return this.http.get < ActiveValue > (url, { params: { id: id.toString() } });
}
private getRequests(ids: number[]) {
return ids.map((id) => this.getValue(id));
}
public getSum(group: string) {
return this.getIds(group).pipe(
map(({ ids }) => this.getRequests(ids)),
switchMap((requests) => forkJoin(requests)),
map((results) => countActiveValues(result))
);
}
}
并且不要忘记为您的请求捕获错误 ;)
可能是这样的:
const data = [
{ id: 1, value: 30, active: true },
{ id: 2, value: 10, active: false },
{ id: 3, value: 5, active: true },
{ id: 45, value: 1, active: false }
]
function getIds(group) {
const promise = new Promise(function(resolve, reject) {
setTimeout(function() {
const resp = { ids: [1, 2, 3, 45] }
resolve(resp.ids);
}, 100);
});
return Rx.Observable.fromPromise(promise)
}
function getValue(id) {
const promise = new Promise(function(resolve, reject) {
setTimeout(function() {
resolve(data.find(x => x.id == id));
}, 100);
});
return Rx.Observable.fromPromise(promise)
}
const reduceSum = (acc, {active, value}) => acc += active ? value : 0
function getSum(group) {
return this.getIds(group)
.mergeMap(ids => {
return Rx.Observable.from(ids)
.mergeMap(id => this.getValue(id))
.reduce(reduceSum, 0)
})
}
getSum().subscribe(console.log)
<script src="https://unpkg.com/@reactivex/rxjs@5.5.12/dist/global/Rx.js"></script>
我有一个端点 returns { ids: [1, 2, 3, 45] }
。
另一个 returns 给定 id 的值:{ id: 3, value: 30, active: true }
。
我正在尝试构建一个调用第一个端点的可观察对象,并为每个返回的 id 调用第二个端点并发出所有 active = true
值的总和:
private getIds(group: string) {
const url = '...';
return this.http.get<{ ids: number[] }>(url, { params: { group } });
}
private getValue(id: number) {
const url = '...';
return this.http.get<ActiveValue>(url, { params: { id: id.toString() } });
}
public getSum(group: string) {
let sum = 0;
const bSubject = new BehaviorSubject(sum);
const observables = this.getIds(group).pipe(
mergeMap(({ ids }) => ids),
map(id => this.getValue(id).pipe(tap(({ value, active }) => {
if (active) {
sum += value;
bSubject.next(sum);
}
})))
);
const observable = forkJoin(observables).pipe(map(() => sum));
return { bSubject, observable };
}
interface ActiveValue {
value: number;
active: boolean;
}
但它抱怨:
forkJoin is deprecated: Use the version that takes an array of Observables instead (deprecation)
此外,当我将鼠标悬停在 observables
上时,它显示:
const observables: Observable<Observable<ActiveValue>>
...虽然我认为应该是 Observable<ActiveValue>[]
我怎样才能让它发挥作用?
我不确定,但你可以试试这样的方法
interface ActiveValue {
value: number;
active: boolean;
}
function countActiveValues(values: ActiveVale[]) {
return values.reduce((acc, { value, active }) => acc + active ? value : 0, 0)
}
class MyClass {
private getIds(group: string) {
const url = '...';
return this.http.get < { ids: number[] } > (url, { params: { group } });
}
private getValue(id: number) {
const url = '...';
return this.http.get < ActiveValue > (url, { params: { id: id.toString() } });
}
private getRequests(ids: number[]) {
return ids.map((id) => this.getValue(id));
}
public getSum(group: string) {
return this.getIds(group).pipe(
map(({ ids }) => this.getRequests(ids)),
switchMap((requests) => forkJoin(requests)),
map((results) => countActiveValues(result))
);
}
}
并且不要忘记为您的请求捕获错误 ;)
可能是这样的:
const data = [
{ id: 1, value: 30, active: true },
{ id: 2, value: 10, active: false },
{ id: 3, value: 5, active: true },
{ id: 45, value: 1, active: false }
]
function getIds(group) {
const promise = new Promise(function(resolve, reject) {
setTimeout(function() {
const resp = { ids: [1, 2, 3, 45] }
resolve(resp.ids);
}, 100);
});
return Rx.Observable.fromPromise(promise)
}
function getValue(id) {
const promise = new Promise(function(resolve, reject) {
setTimeout(function() {
resolve(data.find(x => x.id == id));
}, 100);
});
return Rx.Observable.fromPromise(promise)
}
const reduceSum = (acc, {active, value}) => acc += active ? value : 0
function getSum(group) {
return this.getIds(group)
.mergeMap(ids => {
return Rx.Observable.from(ids)
.mergeMap(id => this.getValue(id))
.reduce(reduceSum, 0)
})
}
getSum().subscribe(console.log)
<script src="https://unpkg.com/@reactivex/rxjs@5.5.12/dist/global/Rx.js"></script>