获取先前的可观察结果以分叉连接管道 RXJS

Get previous observable result to fork join pipe RXJS

第一次调用 Http 获取货运数据 如果装运数据不可用执行将停止并显示错误

如果装运数据可用我需要调用paralalley

我使用 fork join 调用 paralell 请求,在获取 fork join 数据后如何在 fork join map 方法中使用装运数据。

   from(getShipment(this.data.CardId))
              .pipe(
                filter(shipmentData => shipmentData != undefined),
                tap(shipmentData => {
                  if (shipmentData.ShipmentId === null || shipmentData.ShipmentId === "") {
                    throw "Shipment needs to be connected"
                  }
                }),
                //I NEED TO PASS THIS SHIPMENT DATA TO FORK JOIN PIPE METHOD
                map(shipmentData => shipmentData),                
                concatMap(shipmentData =>
                  forkJoin([
                    getOrderLineAllocationByCardIdObservable(this.data.CardId),
                    getLotsByCardIdObservable(this.data.CardId)
                  ])
                    .pipe(
                      filter(response => response != undefined),
                      map(([inventoryData, lotsData],shipmentData) => {
                                               
                       //I NEED TO ACCESS SHIPMENT DATA IN HERE
                       //SHIPMENT DATA NOT AVAILABLE IN HERE
        
                      })
                    )
                ),
                // Catch errors thrown above
                catchError(error => {                      
                  return EMPTY;
                }),
                // Always finish by Hiding Loading indicator
                finalize(() => this.store.dispatch(setLoadingSpinner({showLoading: false})))
              )
              .subscribe();

只需从您的地图函数中删除 shipmentData 作为参数,它已经存在:

import {
  of,
  map,
  Observable,
  from,
  filter,
  tap,
  concatMap,
  forkJoin,
  catchError,
  finalize,
} from 'rxjs';

// Open the console in the bottom right to see results.
function getShipment(x) {
  return new Promise(function (res, rej) {
    res({ key: 'hi, im shipmentData', ShipmentId: 989 });
  });
}

function getOrderLineAllocationByCardIdObservable(x) {
  return of(1);
}

function getLotsByCardIdObservable(x) {
  return of(2);
}

from(getShipment(1))
  .pipe(
    filter((shipmentData) => shipmentData != undefined),
    tap((shipmentData: any) => {
      if (shipmentData.ShipmentId === null || shipmentData.ShipmentId === '') {
        throw 'Shipment needs to be connected';
      }
    }),
    concatMap((shipmentData) =>
      forkJoin(
        getOrderLineAllocationByCardIdObservable(666),
        getLotsByCardIdObservable(666)
      ).pipe(
        filter((response) => response != undefined),
        map(([inventoryData, lotsData]) => {
          console.log(shipmentData);
        })
      )
    ),
    // Catch errors thrown above
    catchError((error) => {
      return of();
    }),
    // Always finish by Hiding Loading indicator
    finalize(() =>
      // this.store.dispatch(setLoadingSpinner({ showLoading: false }))
      console.log('rdy')
    )
  )
  .subscribe();

可运行示例: https://stackblitz.com/edit/rxjs-t5mzxy?file=index.ts