RxJS group by field 和 return new observable

RxJS group by field and return new observable

我遵循接口和Observable<Machine[]>,我想要实现的是在Observable<Machine[]>和return映射可观察对象中按机器symbol属性分组Observable<Order[]>.

export interface Machine {
    symbol: string;
    price: number;
    amount: number;
    id: number;
}

export interface Order {
    symbol: string;
    machines: OrderMachine[];
}

export interface OrderMachine {
    price: number;
    amount: number;
    id: number;
}

我试过使用 RxJS gropBy 运算符,但它似乎 return 一个一个地分组数组。

machines: Machine[] = [
        { amount: 1,  id: 1, symbol: "A", price: 1 },
        { amount: 1,  id: 2, symbol: "A", price: 2 }
    ];


of(machines).pipe(
        takeUntil(this.unsubscribe),
        mergeMap(res => res),
        groupBy(m => m.symbol),
        mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
        map(x => { // here I have probably wrong model [string, Machine[]]
            const orderMachines = x[1].map(y => { return <OrderMachine>{price: y.price, amount: y.amount, id: y.id }})
            return <Order>{ symbol: x[0], machines: orderMachines }  })
        );

结果我有 Observable<Order> 而不是 Observable<Order[]>

预期结果模型:

orders: Order[] = [
        {   
            symbol: "A", 
            machines: [
                { amount: 1, price: 1, id: 1 },
                { amount: 1, price: 2, id: 2 }
            ]
        }
    ];

这里有一个基于您的方法的可能解决方案,但有一些变化:

const machines = [
  { amount: 1, id: 1, symbol: "A", price: 1 },
  { amount: 1, id: 2, symbol: "A", price: 2 },
  { amount: 1, id: 3, symbol: "B", price: 3 }
];

from(machines) // (1)
  .pipe(
    // (2)
    groupBy((m) => m.symbol),
    mergeMap((group) => group.pipe(toArray())),
    map((arr) => ({
      symbol: arr[0].symbol, // every group has at least one element
      machines: arr.map(({ price, amount, id }) => ({
        price,
        amount,
        id
      }))
    })),
    toArray(), // (3)
  )
  .subscribe(console.log);

(1) 我将 of(machines) 更改为 from(machines) 以便将 machines 中的对象一个一个地发射到流中。在此更改之前,整个数组会立即发出,因此流会中断。

(2) 我从管道中删除了 takeUntil(this.unsubscribe)mergeMap(res => res),因为没有理由在您的示例中包含它们。 takeUntil 不会有任何影响,因为流是有限的和同步的。与 mergeMap 一起应用的身份函数 (res => res) 在流的流中是有意义的,而在您的示例中并非如此。还是您的项目实际上需要这些运算符,因为您有无穷无尽的可观察数据流?

(3) toArray() 是将 Observable<Order> 转换为 Observable<Order[]> 的原因。它会一直等到流结束并立即将所有流值作为数组发出。

编辑:

操作员提到他更需要一个与无限流兼容的解决方案,但因为 toArray 仅适用于有限流,所以上面提供的答案在这种情况下永远不会发出任何东西。

为了解决这个问题,我会避免使用 rxjs 中的 groupBy。在您需要将一个流分成几组流的其他情况下,它 cvan 是一个非常强大的工具,但在您的情况下,您只是想对一个数组进行分组,并且有更简单的方法。

this.store.pipe(
    select(fromOrder.getMachines)
    map((arr) =>
        // (*) group by symbol
        arr.reduce((acc, { symbol, price, amount, id }) => {
            acc[symbol] = {
                symbol,
                machines: (acc[symbol] ? acc[symbol].machines : [])
                    .concat({ price, amount, id })
            };
            return acc;
        }, {})
    ),
)
.subscribe((result) => 
    // (**)
    console.log(Object.values(result))
);

(*) 你可以使用香草 groupBy implementation returns 形状为 {[symbol: string]: Order}.

的对象

(**) result 是一个对象,但您可以轻松地将其转换为数组,但应用 Object.values(result)

@kruschid 非常感谢您的回复,它可以正常工作,但不幸的是,当我想在我的商店 (ngrx) 中使用它时它不起作用,类型没问题,但它在 mergeMap方法:

   this.store.pipe(select(fromOrder.getMachines),
                     mergeMap(res => res), // Machine[]
                     groupBy((m) => m.symbol),
                     tap(x => console.log(x)), //this shows object GroupedObservable {_isScalar: false, key: "A", groupSubject: Subject, refCountSubscription: GroupBySubscriber} 
                     mergeMap((group) => group.pipe(toArray())),
                     tap(x => console.log(x)), // this is not printed in console
                     map((arr) => <Order>({
                        symbol: arr[0].symbol,
                        machines: arr.map(({ price, amount, id }) => ({
                            price,
                            amount,
                            id
                        }))
                        })),
                     toArray())) // (3)