如何使用 Angular/RXJS 对嵌套可观察数组的内部 属性 求和?
How to sum inner property of nested observable arrays, using Angular/RXJS?
我在获取另一个 Observable 中一个 Observable 的内部数字 属性 的总和(或任何减少)时遇到问题。
我有一个“帐户”对象的 Observable 数组 (Observable<AppAccount[]>
)。
export interface AppAccount {
_id?: string;
name: string;
}
还有一个“余额”对象的 Observable 数组,每个对象都有一个 accountId。许多余额可以与一个帐户相关联(sorted/filtered 按日期,但为简洁起见删除了该部分)
export interface AccountBalance {
_id?: string;
accountId: string;
amount: number;
}
我有一个辅助方法,return它只是给定帐户的最后一个 Balance 对象的金额。
getLastAmount(account: AppAccount): Observable<number> {
return this.balanceService.balances$.pipe(
map(balances => {
let last = balances.filter(balance => {
return balance.accountId === account._id;
}).sort().pop();
//console.log(last)
return last ? last.amount : 0;
}),
tap(amount => console.log(`getLastAmount() => ${amount}`)),
);
}
现在我正在尝试编写一个循环遍历帐户的方法,为每个帐户调用 getLastAmount(),然后将它们全部相加并 return 一个 Observable。这是我到目前为止所管理的:
getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
return accounts$.pipe(
map(accounts => from(accounts)),
mergeAll(),
mergeMap(account => this.getLastAmount(account)),
reduce((sum, current) => {
console.log(`${sum} + ${current}`);
return sum + current;
}, 0)
);
}
但这似乎永远不会 return,并且陷入无限循环??
只有一个账户和一个余额关联,余额的 'amount' 为“10”,我从控制台日志中得到:“0 + 10”一遍又一遍,还有网络日志还要确认它正在连续调用 getBalances()。
我走在正确的轨道上吗?有没有更好的办法?为什么这个 RXJS 管道会卡在一个循环中?
编辑:我根据picci的建议做了一些修改:
getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
return accounts$.pipe(
map(accounts => accounts.map(account => this.getLastAmount(account))),
concatMap(balances$ => { console.log('balances$', balances$); return forkJoin(balances$); }),
tap(balances => console.log('balances', balances)),
map(balances => balances.reduce(
(amountSum, amount) => {
console.log(`${amountSum} + ${amount}`)
amountSum = amountSum + amount;
return amountSum
}, 0))
);
}
但这仍然没有 returning,或者管道没有完成?
我在这里做了一个 stackblitz:https://stackblitz.com/edit/angular-rxjs-nested-obsv
如果您检查控制台输出,它似乎不会比 forkJoin 调用更进一步...
嗯 - 首先,我认为您不应该使用那样的可观测值。
如果你只需要 totalBalance
你可以使用这样的东西 (:
private appAcount$ = from<AppAccount[]>([
{ _id: '1', name: 'user-1' },
{ _id: '2', name: 'user-2' },
{ _id: '3', name: 'user-3' },
]);
// this would be your http call
public getBalances(accountId: string): Observable<AccountBalance[]> {
const ab = [
{ _id: '1', accountId: '1', amount: 100 },
{ _id: '2', accountId: '2', amount: 200 },
{ _id: '3', accountId: '2', amount: 300 },
{ _id: '4', accountId: '3', amount: 400 },
{ _id: '5', accountId: '3', amount: 500 },
{ _id: '6', accountId: '3', amount: 600 },
];
return of(ab.filter(x => x.accountId === accountId));
}
lastAmounts$: Observable<AccountBalance[]> = this.appAcount$
.pipe(
switchMap(appAccount =>
this.getBalances(appAccount._id)
.pipe(
// replace this with your date filter
map(balances => [balances[balances.length - 1]])
)
),
scan((acc, c) => [ ...acc, ...c ])
);
totalBalance$ = this.lastAmounts$
.pipe(
map(x => x.reduce((p, c) => p += c.amount, 0))
)
如果你只需要总余额,你可以只订阅 totalBalance$ observable。
不过我要说的是,如果您可以批量获取您拥有的所有 appAccount 的所有 AccountBalance
,我不建议对每个 appAccount 进行 HTTP 调用 - 这样您就可以appAccounts$
和 balances$
.
都使用 combineLatest
如果我没理解错,你可以这样进行
// somehow you start with the observable which returns the array of accounts
const accounts$: Observable<AppAccount[]> = getAccounts$()
// you also set the date you are interested in
const myDate: Moment = getDate()
// now you build the Observable<number> which will emit the sum of the last balance amounts
const amountSum$: Observable<number> = accounts$.pipe(
// you transform an array of accounts in an array of Observable<number> representing the last Balance amount
map((accounts: Account[]) => {
// use the getLastAmount function you have coded
return accounts.map(account => getLastAmount(account, myDate))
}),
// now we trigger the execution of the Observable in parallel using concatMap, which basically mean wait for the source Observable to complete
// and forkJoin which actually executes the Observables in parallel
concatMap(accounts$ => forkJoin(accounts$)),
// now that we have an array of balances, we reduce them to the sum using the Array reduce method
map(balances => balances.reduce(
(amountSum, amount) => {
amountSum = amountSum + amount;
return amountSum
}, 0)
)
)
// eventually you subscribe to the amountSum$ Observable to get the result
amountSum$.subscribe({
next: amountSum => console.log(`The sum of the last balances is: ${amountSum}`),
error: console.err,
complete: () => console.log("I am done")
})
可能还有其他组合会产生相同的结果,但这似乎有效,可以在 this stackblitz 中查看。
如果您对使用 http 调用的 RxJS 的一些常见模式感兴趣,您可能需要阅读 this blog。
我在获取另一个 Observable 中一个 Observable 的内部数字 属性 的总和(或任何减少)时遇到问题。
我有一个“帐户”对象的 Observable 数组 (Observable<AppAccount[]>
)。
export interface AppAccount {
_id?: string;
name: string;
}
还有一个“余额”对象的 Observable 数组,每个对象都有一个 accountId。许多余额可以与一个帐户相关联(sorted/filtered 按日期,但为简洁起见删除了该部分)
export interface AccountBalance {
_id?: string;
accountId: string;
amount: number;
}
我有一个辅助方法,return它只是给定帐户的最后一个 Balance 对象的金额。
getLastAmount(account: AppAccount): Observable<number> {
return this.balanceService.balances$.pipe(
map(balances => {
let last = balances.filter(balance => {
return balance.accountId === account._id;
}).sort().pop();
//console.log(last)
return last ? last.amount : 0;
}),
tap(amount => console.log(`getLastAmount() => ${amount}`)),
);
}
现在我正在尝试编写一个循环遍历帐户的方法,为每个帐户调用 getLastAmount(),然后将它们全部相加并 return 一个 Observable。这是我到目前为止所管理的:
getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
return accounts$.pipe(
map(accounts => from(accounts)),
mergeAll(),
mergeMap(account => this.getLastAmount(account)),
reduce((sum, current) => {
console.log(`${sum} + ${current}`);
return sum + current;
}, 0)
);
}
但这似乎永远不会 return,并且陷入无限循环??
只有一个账户和一个余额关联,余额的 'amount' 为“10”,我从控制台日志中得到:“0 + 10”一遍又一遍,还有网络日志还要确认它正在连续调用 getBalances()。
我走在正确的轨道上吗?有没有更好的办法?为什么这个 RXJS 管道会卡在一个循环中?
编辑:我根据picci的建议做了一些修改:
getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
return accounts$.pipe(
map(accounts => accounts.map(account => this.getLastAmount(account))),
concatMap(balances$ => { console.log('balances$', balances$); return forkJoin(balances$); }),
tap(balances => console.log('balances', balances)),
map(balances => balances.reduce(
(amountSum, amount) => {
console.log(`${amountSum} + ${amount}`)
amountSum = amountSum + amount;
return amountSum
}, 0))
);
}
但这仍然没有 returning,或者管道没有完成? 我在这里做了一个 stackblitz:https://stackblitz.com/edit/angular-rxjs-nested-obsv 如果您检查控制台输出,它似乎不会比 forkJoin 调用更进一步...
嗯 - 首先,我认为您不应该使用那样的可观测值。
如果你只需要 totalBalance
你可以使用这样的东西 (:
private appAcount$ = from<AppAccount[]>([
{ _id: '1', name: 'user-1' },
{ _id: '2', name: 'user-2' },
{ _id: '3', name: 'user-3' },
]);
// this would be your http call
public getBalances(accountId: string): Observable<AccountBalance[]> {
const ab = [
{ _id: '1', accountId: '1', amount: 100 },
{ _id: '2', accountId: '2', amount: 200 },
{ _id: '3', accountId: '2', amount: 300 },
{ _id: '4', accountId: '3', amount: 400 },
{ _id: '5', accountId: '3', amount: 500 },
{ _id: '6', accountId: '3', amount: 600 },
];
return of(ab.filter(x => x.accountId === accountId));
}
lastAmounts$: Observable<AccountBalance[]> = this.appAcount$
.pipe(
switchMap(appAccount =>
this.getBalances(appAccount._id)
.pipe(
// replace this with your date filter
map(balances => [balances[balances.length - 1]])
)
),
scan((acc, c) => [ ...acc, ...c ])
);
totalBalance$ = this.lastAmounts$
.pipe(
map(x => x.reduce((p, c) => p += c.amount, 0))
)
如果你只需要总余额,你可以只订阅 totalBalance$ observable。
不过我要说的是,如果您可以批量获取您拥有的所有 appAccount 的所有 AccountBalance
,我不建议对每个 appAccount 进行 HTTP 调用 - 这样您就可以appAccounts$
和 balances$
.
combineLatest
如果我没理解错,你可以这样进行
// somehow you start with the observable which returns the array of accounts
const accounts$: Observable<AppAccount[]> = getAccounts$()
// you also set the date you are interested in
const myDate: Moment = getDate()
// now you build the Observable<number> which will emit the sum of the last balance amounts
const amountSum$: Observable<number> = accounts$.pipe(
// you transform an array of accounts in an array of Observable<number> representing the last Balance amount
map((accounts: Account[]) => {
// use the getLastAmount function you have coded
return accounts.map(account => getLastAmount(account, myDate))
}),
// now we trigger the execution of the Observable in parallel using concatMap, which basically mean wait for the source Observable to complete
// and forkJoin which actually executes the Observables in parallel
concatMap(accounts$ => forkJoin(accounts$)),
// now that we have an array of balances, we reduce them to the sum using the Array reduce method
map(balances => balances.reduce(
(amountSum, amount) => {
amountSum = amountSum + amount;
return amountSum
}, 0)
)
)
// eventually you subscribe to the amountSum$ Observable to get the result
amountSum$.subscribe({
next: amountSum => console.log(`The sum of the last balances is: ${amountSum}`),
error: console.err,
complete: () => console.log("I am done")
})
可能还有其他组合会产生相同的结果,但这似乎有效,可以在 this stackblitz 中查看。
如果您对使用 http 调用的 RxJS 的一些常见模式感兴趣,您可能需要阅读 this blog。