如何使用 Angular/RXJS 对嵌套可观察数组的内部 属性 求和?

How to sum inner property of nested observable arrays, using Angular/RXJS?

我在获取另一个 Observable 中一个 Observable 的内部数字 属性 的总和(或任何减少)时遇到问题。

我有一个“帐户”对象的 Observable 数组 (Observable<AppAccount[]>)。

export interface AppAccount {
    _id?: string;
    name: string;
}

还有一个“余额”对象的 Observable 数组,每个对象都有一个 accountId。许多余额可以与一个帐户相关联(sorted/filtered 按日期,但为简洁起见删除了该部分)

export interface AccountBalance {
    _id?: string;
    accountId: string;
    amount: number;
}

我有一个辅助方法,return它只是给定帐户的最后一个 Balance 对象的金额。

getLastAmount(account: AppAccount): Observable<number> {
    return this.balanceService.balances$.pipe(
      map(balances => {
        let last = balances.filter(balance => {
          return balance.accountId === account._id;
        }).sort().pop();
        //console.log(last)
        return last ? last.amount : 0;
      }),
      tap(amount => console.log(`getLastAmount() => ${amount}`)),
    );
  }

现在我正在尝试编写一个循环遍历帐户的方法,为每个帐户调用 getLastAmount(),然后将它们全部相加并 return 一个 Observable。这是我到目前为止所管理的:

getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
    return accounts$.pipe(
      map(accounts => from(accounts)),
      mergeAll(),
      mergeMap(account => this.getLastAmount(account)),
      reduce((sum, current) => {
        console.log(`${sum} + ${current}`);
        return sum + current;
      }, 0)
    );
  }

但这似乎永远不会 return,并且陷入无限循环??

只有一个账户和一个余额关联,余额的 'amount' 为“10”,我从控制台日志中得到:“0 + 10”一遍又一遍,还有网络日志还要确认它正在连续调用 getBalances()。

我走在正确的轨道上吗?有没有更好的办法?为什么这个 RXJS 管道会卡在一个循环中?

编辑:我根据picci的建议做了一些修改:

getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
    return accounts$.pipe(
      map(accounts => accounts.map(account => this.getLastAmount(account))),
      concatMap(balances$ => { console.log('balances$', balances$); return forkJoin(balances$); }),
      tap(balances => console.log('balances', balances)),
      map(balances => balances.reduce(
        (amountSum, amount) => {
          console.log(`${amountSum} + ${amount}`)
          amountSum = amountSum + amount;
          return amountSum
        }, 0))
    );
  }

但这仍然没有 returning,或者管道没有完成? 我在这里做了一个 stackblitz:https://stackblitz.com/edit/angular-rxjs-nested-obsv 如果您检查控制台输出,它似乎不会比 forkJoin 调用更进一步...

嗯 - 首先,我认为您不应该使用那样的可观测值。

如果你只需要 totalBalance 你可以使用这样的东西 (:

  private appAcount$ = from<AppAccount[]>([
    { _id: '1', name: 'user-1' },
    { _id: '2', name: 'user-2' },
    { _id: '3', name: 'user-3' },
  ]);

  // this would be your http call
  public getBalances(accountId: string): Observable<AccountBalance[]> {
    const ab = [
      { _id: '1', accountId: '1', amount: 100 },
      { _id: '2', accountId: '2', amount: 200 },
      { _id: '3', accountId: '2', amount: 300 },
      { _id: '4', accountId: '3', amount: 400 },
      { _id: '5', accountId: '3', amount: 500 },
      { _id: '6', accountId: '3', amount: 600 },
    ];

    return of(ab.filter(x => x.accountId === accountId));
  }

  lastAmounts$: Observable<AccountBalance[]> = this.appAcount$
    .pipe(
      switchMap(appAccount => 
        this.getBalances(appAccount._id)
          .pipe(
            // replace this with your date filter
            map(balances => [balances[balances.length - 1]]) 
          )
      ),
      scan((acc, c) => [ ...acc, ...c ])
    );

  totalBalance$ = this.lastAmounts$
    .pipe(
      map(x => x.reduce((p, c) => p += c.amount, 0))
    )

如果你只需要总余额,你可以只订阅 totalBalance$ observable。

不过我要说的是,如果您可以批量获取您拥有的所有 appAccount 的所有 AccountBalance,我不建议对每个 appAccount 进行 HTTP 调用 - 这样您就可以appAccounts$balances$.

都使用 combineLatest

如果我没理解错,你可以这样进行

// somehow you start with the observable which returns the array of accounts
const accounts$: Observable<AppAccount[]> = getAccounts$()
// you also set the date you are interested in
const myDate: Moment = getDate()

// now you build the Observable<number> which will emit the sum of the last balance amounts
const amountSum$: Observable<number> = accounts$.pipe(
  // you transform an array of accounts in an array of Observable<number> representing the last Balance amount
  map((accounts: Account[]) => {
    // use the getLastAmount function you have coded
    return accounts.map(account => getLastAmount(account, myDate))
  }),
  // now we trigger the execution of the Observable in parallel using concatMap, which basically mean wait for the source Observable to complete
  // and forkJoin which actually executes the Observables in parallel
  concatMap(accounts$ => forkJoin(accounts$)),
  // now that we have an array of balances, we reduce them to the sum using the Array reduce method
  map(balances => balances.reduce(
    (amountSum, amount) => {
      amountSum = amountSum + amount;
      return amountSum
    }, 0)
  )
)

// eventually you subscribe to the amountSum$ Observable to get the result
amountSum$.subscribe({
  next: amountSum => console.log(`The sum of the last balances is: ${amountSum}`),
  error: console.err,
  complete: () => console.log("I am done")
})

可能还有其他组合会产生相同的结果,但这似乎有效,可以在 this stackblitz 中查看。

如果您对使用 http 调用的 RxJS 的一些常见模式感兴趣,您可能需要阅读 this blog