NgRx effect call API 并循环调度 action

NgRx effect call API and dispatch action in a loop

你好, 我有以下 Json 结构,它作为 UpdateOrders 操作中的有效负载提供。 实际上,我想遍历 reservationsorders,调用 this.orderApiService.updateOrder 服务并调度 UpdateOrderProgress 操作。在 UpdateOrderProgress 操作中,我想提供 numberOfReservationsUpdatedtotalReservations

const reservationOrders = [
    {
        reservationNumber: '22763883',
        orders: [
            {
                orderId: 'O12341',
                amount: 25
            },
            {
                orderId: 'O45321',
                amount: 50
            }
        ]
    },
    {
        reservationNumber: '42345719',
        orders: [
            {
                orderId: 'O12343',
                amount: 75
            }
        ]
    }
];

我有下面的效果来实现,可惜这个效果不起作用,抛出异常

@Effect()
updateOrders$ = this.actions$.pipe(
    ofType<UpdateOrders>(UpdateOrdersActionType.UPDATE_ORDERS),
    filter((action) => !!action.reservationOrders),
    exhaustMap((action) => {
        return combineLatest(action.reservationOrders.map((x, index) => {
            const totalReservations = action.reservationOrders.length;
            const numberOfReservationsUpdated = index + 1;
            return combineLatest(x.orders.map((order) => {
                const orderUpdateRequest: OrderUpdateRequest = {
                    orderId: order.orderId,
                    amount: order.amount
                };
                return this.orderApiService.updateOrder(orderUpdateRequest).pipe(
                    switchMap(() => [new UpdateOrderProgress(numberOfReservationsUpdated, totalReservations)]),
                    catchError((message: string) => of(console.info(message))),
                );
            }))
        }))
    })
);

我怎样才能做到这一点?我缺少哪些 RxJs 运算符?

您可以改用 mergemergeMap 的组合来实现您想要的效果,而不是使用 combineLatest

以下是您的问题陈述 -

  1. 一个动作触发一个可观察的
  2. 这需要触发多个 observables
  3. 这些可观察对象中的每一个都需要触发一些 动作 (UPDATE_ACTION)

实现此目的的一种方法如下 -

const subj = new Subject<number[]>();

const getData$ = (index) => {
  return of({
    index,
    value: 'Some value for ' + index,
  }).pipe(delay(index*1000));
};

const source = subj.pipe(
  filter((x) => !!x),
  exhaustMap((records: number[]) => {
    const dataRequests = records.map((r) => getData$(r));
    return merge(dataRequests);
  }),
  mergeMap((obs) => obs)
);

source.subscribe(console.log);

subj.next([3,1,1,4]); // Each of the value in array simulates a call to an endpoint that'll take i*1000 ms to complete

// OUTPUT - 
// {index: 1, value: "Some value for 1"}
// {index: 1, value: "Some value for 1"}
// {index: 3, value: "Some value for 3"}
// {index: 4, value: "Some value for 4"}

鉴于上述解释,您的代码需要更改为 -

const getOrderRequest$ = (order: OrderUpdateRequest, numberOfReservationsUpdated, totalReservations) => {
    const orderUpdateRequest: OrderUpdateRequest = {
        orderId: order.orderId,
        amount: order.amount
    };
    return this.orderApiService.updateOrder(orderUpdateRequest).pipe(
        switchMap(() => new UpdateOrderProgress(numberOfReservationsUpdated, totalReservations)),
        catchError((message: string) => of(console.info(message))),
    );
}

updateOrders$ = this.actions$.pipe(
    ofType<UpdateOrders>(UpdateOrdersActionType.UPDATE_ORDERS),
    filter((action) => !!action.reservationOrders),
    exhaustMap((action) => {

        const reservationOrders = action.reservationOrders;
        const totalLen = reservationOrders.length
        const allRequests = []
        reservationOrders.forEach((r, index) => {
            r.orders.forEach(order => {
                const req = getOrderRequest$(order, index + 1, totalLen);
                allRequests.push(req);
            });
        });

        return merge(allRequests)
    }), 
    mergeMap(obs=> obs)
);

旁注 - 虽然您的示例中的嵌套可观察对象可能有效,但由于 http 调用的固有性质需要未知的时间才能完成,因此您可能会看到错误的结果。 意思是,按照您编写的方式,您可能会在某些情况下看到 numberOfReservationsUpdated 并不是更新的实际预订数量的准确指示。

更好的方法是在减速器中处理状态信息。基本上,在 UPDATE 操作负载中传递 reservationNumber 并让 reducer 决定有多少请求正在等待完成。这将是系统状态的准确表示。此外,它会将 @effect 中的逻辑简化为单个嵌套可观察对象而不是多个嵌套。