获取先前的可观察结果以分叉连接管道 RXJS
Get previous observable result to fork join pipe RXJS
第一次调用 Http 获取货运数据
如果装运数据不可用执行将停止并显示错误
如果装运数据可用我需要调用paralalley
我使用 fork join 调用 paralell 请求,在获取 fork join 数据后如何在 fork join map 方法中使用装运数据。
from(getShipment(this.data.CardId))
.pipe(
filter(shipmentData => shipmentData != undefined),
tap(shipmentData => {
if (shipmentData.ShipmentId === null || shipmentData.ShipmentId === "") {
throw "Shipment needs to be connected"
}
}),
//I NEED TO PASS THIS SHIPMENT DATA TO FORK JOIN PIPE METHOD
map(shipmentData => shipmentData),
concatMap(shipmentData =>
forkJoin([
getOrderLineAllocationByCardIdObservable(this.data.CardId),
getLotsByCardIdObservable(this.data.CardId)
])
.pipe(
filter(response => response != undefined),
map(([inventoryData, lotsData],shipmentData) => {
//I NEED TO ACCESS SHIPMENT DATA IN HERE
//SHIPMENT DATA NOT AVAILABLE IN HERE
})
)
),
// Catch errors thrown above
catchError(error => {
return EMPTY;
}),
// Always finish by Hiding Loading indicator
finalize(() => this.store.dispatch(setLoadingSpinner({showLoading: false})))
)
.subscribe();
只需从您的地图函数中删除 shipmentData
作为参数,它已经存在:
import {
of,
map,
Observable,
from,
filter,
tap,
concatMap,
forkJoin,
catchError,
finalize,
} from 'rxjs';
// Open the console in the bottom right to see results.
function getShipment(x) {
return new Promise(function (res, rej) {
res({ key: 'hi, im shipmentData', ShipmentId: 989 });
});
}
function getOrderLineAllocationByCardIdObservable(x) {
return of(1);
}
function getLotsByCardIdObservable(x) {
return of(2);
}
from(getShipment(1))
.pipe(
filter((shipmentData) => shipmentData != undefined),
tap((shipmentData: any) => {
if (shipmentData.ShipmentId === null || shipmentData.ShipmentId === '') {
throw 'Shipment needs to be connected';
}
}),
concatMap((shipmentData) =>
forkJoin(
getOrderLineAllocationByCardIdObservable(666),
getLotsByCardIdObservable(666)
).pipe(
filter((response) => response != undefined),
map(([inventoryData, lotsData]) => {
console.log(shipmentData);
})
)
),
// Catch errors thrown above
catchError((error) => {
return of();
}),
// Always finish by Hiding Loading indicator
finalize(() =>
// this.store.dispatch(setLoadingSpinner({ showLoading: false }))
console.log('rdy')
)
)
.subscribe();
可运行示例:
https://stackblitz.com/edit/rxjs-t5mzxy?file=index.ts
第一次调用 Http 获取货运数据 如果装运数据不可用执行将停止并显示错误
如果装运数据可用我需要调用paralalley
我使用 fork join 调用 paralell 请求,在获取 fork join 数据后如何在 fork join map 方法中使用装运数据。
from(getShipment(this.data.CardId))
.pipe(
filter(shipmentData => shipmentData != undefined),
tap(shipmentData => {
if (shipmentData.ShipmentId === null || shipmentData.ShipmentId === "") {
throw "Shipment needs to be connected"
}
}),
//I NEED TO PASS THIS SHIPMENT DATA TO FORK JOIN PIPE METHOD
map(shipmentData => shipmentData),
concatMap(shipmentData =>
forkJoin([
getOrderLineAllocationByCardIdObservable(this.data.CardId),
getLotsByCardIdObservable(this.data.CardId)
])
.pipe(
filter(response => response != undefined),
map(([inventoryData, lotsData],shipmentData) => {
//I NEED TO ACCESS SHIPMENT DATA IN HERE
//SHIPMENT DATA NOT AVAILABLE IN HERE
})
)
),
// Catch errors thrown above
catchError(error => {
return EMPTY;
}),
// Always finish by Hiding Loading indicator
finalize(() => this.store.dispatch(setLoadingSpinner({showLoading: false})))
)
.subscribe();
只需从您的地图函数中删除 shipmentData
作为参数,它已经存在:
import {
of,
map,
Observable,
from,
filter,
tap,
concatMap,
forkJoin,
catchError,
finalize,
} from 'rxjs';
// Open the console in the bottom right to see results.
function getShipment(x) {
return new Promise(function (res, rej) {
res({ key: 'hi, im shipmentData', ShipmentId: 989 });
});
}
function getOrderLineAllocationByCardIdObservable(x) {
return of(1);
}
function getLotsByCardIdObservable(x) {
return of(2);
}
from(getShipment(1))
.pipe(
filter((shipmentData) => shipmentData != undefined),
tap((shipmentData: any) => {
if (shipmentData.ShipmentId === null || shipmentData.ShipmentId === '') {
throw 'Shipment needs to be connected';
}
}),
concatMap((shipmentData) =>
forkJoin(
getOrderLineAllocationByCardIdObservable(666),
getLotsByCardIdObservable(666)
).pipe(
filter((response) => response != undefined),
map(([inventoryData, lotsData]) => {
console.log(shipmentData);
})
)
),
// Catch errors thrown above
catchError((error) => {
return of();
}),
// Always finish by Hiding Loading indicator
finalize(() =>
// this.store.dispatch(setLoadingSpinner({ showLoading: false }))
console.log('rdy')
)
)
.subscribe();
可运行示例: https://stackblitz.com/edit/rxjs-t5mzxy?file=index.ts