如何构建一个基于先前结果调用多个可观察对象的可观察对象?

How to build an observable which calls multiple observables based on previous result?

我有一个端点 returns { ids: [1, 2, 3, 45] }

另一个 returns 给定 id 的值:{ id: 3, value: 30, active: true }

我正在尝试构建一个调用第一个端点的可观察对象,并为每个返回的 id 调用第二个端点并发出所有 active = true 值的总和:

private getIds(group: string) {
  const url = '...';
  return this.http.get<{ ids: number[] }>(url, { params: { group } });
}

private getValue(id: number) {
  const url = '...';
  return this.http.get<ActiveValue>(url, { params: { id: id.toString() } });
}

public getSum(group: string) {
  let sum = 0;
  const bSubject = new BehaviorSubject(sum);

  const observables = this.getIds(group).pipe(
    mergeMap(({ ids }) => ids),
    map(id => this.getValue(id).pipe(tap(({ value, active }) => {
      if (active) {
        sum += value;
        bSubject.next(sum);
      }
    })))
  );

  const observable = forkJoin(observables).pipe(map(() => sum));
  return { bSubject, observable };
}

interface ActiveValue {
  value: number;
  active: boolean;
}

但它抱怨:

forkJoin is deprecated: Use the version that takes an array of Observables instead (deprecation)

此外,当我将鼠标悬停在 observables 上时,它显示:

const observables: Observable<Observable<ActiveValue>>

...虽然我认为应该是 Observable<ActiveValue>[]

我怎样才能让它发挥作用?

我不确定,但你可以试试这样的方法

interface ActiveValue {
    value: number;
    active: boolean;
}

function countActiveValues(values: ActiveVale[]) {
    return values.reduce((acc, { value, active }) => acc + active ? value : 0, 0)
}

class MyClass {
    private getIds(group: string) {
        const url = '...';
        return this.http.get < { ids: number[] } > (url, { params: { group } });
    }

    private getValue(id: number) {
        const url = '...';
        return this.http.get < ActiveValue > (url, { params: { id: id.toString() } });
    }

    private getRequests(ids: number[]) {
        return ids.map((id) => this.getValue(id));
    }

    public getSum(group: string) {
        return this.getIds(group).pipe(
            map(({ ids }) => this.getRequests(ids)),
            switchMap((requests) => forkJoin(requests)),
            map((results) => countActiveValues(result))
        );
    }
}

并且不要忘记为您的请求捕获错误 ;)

可能是这样的:

const data = [
  { id: 1, value: 30, active: true },
  { id: 2, value: 10, active: false },
  { id: 3, value: 5, active: true },
  { id: 45, value: 1, active: false }
]

function getIds(group) {
    const promise = new Promise(function(resolve, reject) {
     setTimeout(function() {
        const resp = { ids: [1, 2, 3, 45] }
       resolve(resp.ids);
     }, 100);
   });
   return Rx.Observable.fromPromise(promise)
   
}

function getValue(id) {
        const promise = new Promise(function(resolve, reject) {
     setTimeout(function() {
       resolve(data.find(x => x.id == id));
     }, 100);
   });
   return Rx.Observable.fromPromise(promise)
}

const reduceSum = (acc, {active, value}) => acc += active ? value : 0

function getSum(group) {
  return this.getIds(group)
    .mergeMap(ids => {
        return Rx.Observable.from(ids)
        .mergeMap(id => this.getValue(id))
        .reduce(reduceSum, 0)
    })
}

getSum().subscribe(console.log)
<script src="https://unpkg.com/@reactivex/rxjs@5.5.12/dist/global/Rx.js"></script>