RxJS group by field 和 return new observable
RxJS group by field and return new observable
我遵循接口和Observable<Machine[]>
,我想要实现的是在Observable<Machine[]>
和return映射可观察对象中按机器symbol
属性分组Observable<Order[]>
.
export interface Machine {
symbol: string;
price: number;
amount: number;
id: number;
}
export interface Order {
symbol: string;
machines: OrderMachine[];
}
export interface OrderMachine {
price: number;
amount: number;
id: number;
}
我试过使用 RxJS gropBy 运算符,但它似乎 return 一个一个地分组数组。
machines: Machine[] = [
{ amount: 1, id: 1, symbol: "A", price: 1 },
{ amount: 1, id: 2, symbol: "A", price: 2 }
];
of(machines).pipe(
takeUntil(this.unsubscribe),
mergeMap(res => res),
groupBy(m => m.symbol),
mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
map(x => { // here I have probably wrong model [string, Machine[]]
const orderMachines = x[1].map(y => { return <OrderMachine>{price: y.price, amount: y.amount, id: y.id }})
return <Order>{ symbol: x[0], machines: orderMachines } })
);
结果我有 Observable<Order>
而不是 Observable<Order[]>
。
预期结果模型:
orders: Order[] = [
{
symbol: "A",
machines: [
{ amount: 1, price: 1, id: 1 },
{ amount: 1, price: 2, id: 2 }
]
}
];
这里有一个基于您的方法的可能解决方案,但有一些变化:
const machines = [
{ amount: 1, id: 1, symbol: "A", price: 1 },
{ amount: 1, id: 2, symbol: "A", price: 2 },
{ amount: 1, id: 3, symbol: "B", price: 3 }
];
from(machines) // (1)
.pipe(
// (2)
groupBy((m) => m.symbol),
mergeMap((group) => group.pipe(toArray())),
map((arr) => ({
symbol: arr[0].symbol, // every group has at least one element
machines: arr.map(({ price, amount, id }) => ({
price,
amount,
id
}))
})),
toArray(), // (3)
)
.subscribe(console.log);
(1) 我将 of(machines)
更改为 from(machines)
以便将 machines
中的对象一个一个地发射到流中。在此更改之前,整个数组会立即发出,因此流会中断。
(2) 我从管道中删除了 takeUntil(this.unsubscribe)
和 mergeMap(res => res)
,因为没有理由在您的示例中包含它们。 takeUntil
不会有任何影响,因为流是有限的和同步的。与 mergeMap
一起应用的身份函数 (res => res
) 在流的流中是有意义的,而在您的示例中并非如此。还是您的项目实际上需要这些运算符,因为您有无穷无尽的可观察数据流?
(3) toArray()
是将 Observable<Order>
转换为 Observable<Order[]>
的原因。它会一直等到流结束并立即将所有流值作为数组发出。
编辑:
操作员提到他更需要一个与无限流兼容的解决方案,但因为 toArray
仅适用于有限流,所以上面提供的答案在这种情况下永远不会发出任何东西。
为了解决这个问题,我会避免使用 rxjs 中的 groupBy
。在您需要将一个流分成几组流的其他情况下,它 cvan 是一个非常强大的工具,但在您的情况下,您只是想对一个数组进行分组,并且有更简单的方法。
this.store.pipe(
select(fromOrder.getMachines)
map((arr) =>
// (*) group by symbol
arr.reduce((acc, { symbol, price, amount, id }) => {
acc[symbol] = {
symbol,
machines: (acc[symbol] ? acc[symbol].machines : [])
.concat({ price, amount, id })
};
return acc;
}, {})
),
)
.subscribe((result) =>
// (**)
console.log(Object.values(result))
);
(*) 你可以使用香草 groupBy implementation returns 形状为 {[symbol: string]: Order}
.
的对象
(**) result
是一个对象,但您可以轻松地将其转换为数组,但应用 Object.values(result)
@kruschid 非常感谢您的回复,它可以正常工作,但不幸的是,当我想在我的商店 (ngrx) 中使用它时它不起作用,类型没问题,但它在 mergeMap
方法:
this.store.pipe(select(fromOrder.getMachines),
mergeMap(res => res), // Machine[]
groupBy((m) => m.symbol),
tap(x => console.log(x)), //this shows object GroupedObservable {_isScalar: false, key: "A", groupSubject: Subject, refCountSubscription: GroupBySubscriber}
mergeMap((group) => group.pipe(toArray())),
tap(x => console.log(x)), // this is not printed in console
map((arr) => <Order>({
symbol: arr[0].symbol,
machines: arr.map(({ price, amount, id }) => ({
price,
amount,
id
}))
})),
toArray())) // (3)
我遵循接口和Observable<Machine[]>
,我想要实现的是在Observable<Machine[]>
和return映射可观察对象中按机器symbol
属性分组Observable<Order[]>
.
export interface Machine {
symbol: string;
price: number;
amount: number;
id: number;
}
export interface Order {
symbol: string;
machines: OrderMachine[];
}
export interface OrderMachine {
price: number;
amount: number;
id: number;
}
我试过使用 RxJS gropBy 运算符,但它似乎 return 一个一个地分组数组。
machines: Machine[] = [
{ amount: 1, id: 1, symbol: "A", price: 1 },
{ amount: 1, id: 2, symbol: "A", price: 2 }
];
of(machines).pipe(
takeUntil(this.unsubscribe),
mergeMap(res => res),
groupBy(m => m.symbol),
mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
map(x => { // here I have probably wrong model [string, Machine[]]
const orderMachines = x[1].map(y => { return <OrderMachine>{price: y.price, amount: y.amount, id: y.id }})
return <Order>{ symbol: x[0], machines: orderMachines } })
);
结果我有 Observable<Order>
而不是 Observable<Order[]>
。
预期结果模型:
orders: Order[] = [
{
symbol: "A",
machines: [
{ amount: 1, price: 1, id: 1 },
{ amount: 1, price: 2, id: 2 }
]
}
];
这里有一个基于您的方法的可能解决方案,但有一些变化:
const machines = [
{ amount: 1, id: 1, symbol: "A", price: 1 },
{ amount: 1, id: 2, symbol: "A", price: 2 },
{ amount: 1, id: 3, symbol: "B", price: 3 }
];
from(machines) // (1)
.pipe(
// (2)
groupBy((m) => m.symbol),
mergeMap((group) => group.pipe(toArray())),
map((arr) => ({
symbol: arr[0].symbol, // every group has at least one element
machines: arr.map(({ price, amount, id }) => ({
price,
amount,
id
}))
})),
toArray(), // (3)
)
.subscribe(console.log);
(1) 我将 of(machines)
更改为 from(machines)
以便将 machines
中的对象一个一个地发射到流中。在此更改之前,整个数组会立即发出,因此流会中断。
(2) 我从管道中删除了 takeUntil(this.unsubscribe)
和 mergeMap(res => res)
,因为没有理由在您的示例中包含它们。 takeUntil
不会有任何影响,因为流是有限的和同步的。与 mergeMap
一起应用的身份函数 (res => res
) 在流的流中是有意义的,而在您的示例中并非如此。还是您的项目实际上需要这些运算符,因为您有无穷无尽的可观察数据流?
(3) toArray()
是将 Observable<Order>
转换为 Observable<Order[]>
的原因。它会一直等到流结束并立即将所有流值作为数组发出。
编辑:
操作员提到他更需要一个与无限流兼容的解决方案,但因为 toArray
仅适用于有限流,所以上面提供的答案在这种情况下永远不会发出任何东西。
为了解决这个问题,我会避免使用 rxjs 中的 groupBy
。在您需要将一个流分成几组流的其他情况下,它 cvan 是一个非常强大的工具,但在您的情况下,您只是想对一个数组进行分组,并且有更简单的方法。
this.store.pipe(
select(fromOrder.getMachines)
map((arr) =>
// (*) group by symbol
arr.reduce((acc, { symbol, price, amount, id }) => {
acc[symbol] = {
symbol,
machines: (acc[symbol] ? acc[symbol].machines : [])
.concat({ price, amount, id })
};
return acc;
}, {})
),
)
.subscribe((result) =>
// (**)
console.log(Object.values(result))
);
(*) 你可以使用香草 groupBy implementation returns 形状为 {[symbol: string]: Order}
.
(**) result
是一个对象,但您可以轻松地将其转换为数组,但应用 Object.values(result)
@kruschid 非常感谢您的回复,它可以正常工作,但不幸的是,当我想在我的商店 (ngrx) 中使用它时它不起作用,类型没问题,但它在 mergeMap
方法:
this.store.pipe(select(fromOrder.getMachines),
mergeMap(res => res), // Machine[]
groupBy((m) => m.symbol),
tap(x => console.log(x)), //this shows object GroupedObservable {_isScalar: false, key: "A", groupSubject: Subject, refCountSubscription: GroupBySubscriber}
mergeMap((group) => group.pipe(toArray())),
tap(x => console.log(x)), // this is not printed in console
map((arr) => <Order>({
symbol: arr[0].symbol,
machines: arr.map(({ price, amount, id }) => ({
price,
amount,
id
}))
})),
toArray())) // (3)