RxJs - 组合两个可观察对象、结果选择器函数、zip
RxJs - Combine two observables, result selector function, zip
我正在处理一些遗留代码,但存在设备状态更新错误的问题。问题是,旧的 - legacy 状态更新是使用 long polling 完成的,它在 BE 上存在并发问题,无法修复(被弃用)。有时它会忘记更新状态。消息到达但不正确。
我可以从新的 BE 检索请求它的正确状态(该应用程序现在是一个令人讨厌的混合体),但我无法正确地做到这一点。我的第一个想法是使用 whitLatestFrom
但由于显而易见的原因它不起作用。
legacyService.getState<LegacyState>()
.pipe(
subscribeOn(queue),
withLatestFrom(this.realService.getRealStatus()), // request to the new BE
map(([legacy, real]) => {
// This if statement is a must. getRealStatus() returns only partial state, not the whole DeviceState
// If there is a difference in the state from two sources, I create a new object with correct state
if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
return new LegacyState(
legacy.instrument,
legacy.db,
DeviceState.UNSTABLE,
);
}
return event; // states are same, returning state from legacy source
})
)
.subscribe(event => {
this._deviceStateUpdated$.next(event);
});
适用于应用程序 restart/reload,但稍后 real
状态不会更新,因为没有进行新调用,它只是 returns 之前的值。 combineLatest
也是如此。第一个(来自轮询)已更新,第二个只是 returns 先前的值。
问题是:我怎样才能以这种方式组合两个 observable,当第一个 observable 更新时,我也强制更新第二个 observable 的新值? 并且当然,我可以同时处理它们,因为第二个可观察 returns 只是部分状态。我尝试了多张地图 (swtichMap, concatMap, ...
) 但没有成功。
我所说的投影函数更准确地说是结果选择器函数。请参阅 this page 上的示例 3。我会使用的基本结构是这样的:
import { interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';
//whatever your long polling interval is
const polling = interval(10000);
//then use that to collect your states
const collectState = polling
//pipe the interval event
.pipe(
//first you need the real state so you switchMap into that
//I'm assuming this is Rx can treat as an observable, like a promise
switchMap(this.realService.getRealStatus()),
//then you have your real status and you need to use a result selector function
switchMap(() =>
legacyService.getState(),
//at this point in the result selector function you have access to both states
(real, legacy) => {
//so you can apply your logic
if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
return new LegacyState(
legacy.instrument,
legacy.db,
DeviceState.UNSTABLE
);
} else { return legacy }
})
);
我意识到我已经无益地改变了真实/遗留顺序,但你明白了要点。
另一种方法是创建一个间隔可观察对象和一个压缩可观察对象,它们仅在真实状态和遗留状态都已发出时才发出
import { zip, interval } from 'rxjs';
import { switchMap, map } from 'rxjs/operators';
const polling = interval(10000);
const states = zip(
legacyService.getState(),
this.realService.getRealStatus()
);
const collectState = polling
.pipe(
switchMap(states),
map(statesArray => {
//do the work here
})
);
);
希望这里有帮助
以@Tom 的方法为灵感,我得以修复它。虽然我直接在遗留服务上而不是在轮询间隔上通过管道 switchMap
进行了细微差别。
legacyService.getState<LegacyState>()
.pipe(
subscribeOn(queue),
switchMap(() =>
this.realService.getRealStatus(), // request to the new BE
(legacy, real) => {
if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
return new LegacyState(
legacy.instrument,
legacy.db,
DeviceState.UNSTABLE,
);
}
return event; // states are same, returning state from legacy source
}
)
.subscribe(event => {
this._deviceStateUpdated$.next(event);
});
我正在处理一些遗留代码,但存在设备状态更新错误的问题。问题是,旧的 - legacy 状态更新是使用 long polling 完成的,它在 BE 上存在并发问题,无法修复(被弃用)。有时它会忘记更新状态。消息到达但不正确。
我可以从新的 BE 检索请求它的正确状态(该应用程序现在是一个令人讨厌的混合体),但我无法正确地做到这一点。我的第一个想法是使用 whitLatestFrom
但由于显而易见的原因它不起作用。
legacyService.getState<LegacyState>()
.pipe(
subscribeOn(queue),
withLatestFrom(this.realService.getRealStatus()), // request to the new BE
map(([legacy, real]) => {
// This if statement is a must. getRealStatus() returns only partial state, not the whole DeviceState
// If there is a difference in the state from two sources, I create a new object with correct state
if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
return new LegacyState(
legacy.instrument,
legacy.db,
DeviceState.UNSTABLE,
);
}
return event; // states are same, returning state from legacy source
})
)
.subscribe(event => {
this._deviceStateUpdated$.next(event);
});
适用于应用程序 restart/reload,但稍后 real
状态不会更新,因为没有进行新调用,它只是 returns 之前的值。 combineLatest
也是如此。第一个(来自轮询)已更新,第二个只是 returns 先前的值。
问题是:我怎样才能以这种方式组合两个 observable,当第一个 observable 更新时,我也强制更新第二个 observable 的新值? 并且当然,我可以同时处理它们,因为第二个可观察 returns 只是部分状态。我尝试了多张地图 (swtichMap, concatMap, ...
) 但没有成功。
我所说的投影函数更准确地说是结果选择器函数。请参阅 this page 上的示例 3。我会使用的基本结构是这样的:
import { interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';
//whatever your long polling interval is
const polling = interval(10000);
//then use that to collect your states
const collectState = polling
//pipe the interval event
.pipe(
//first you need the real state so you switchMap into that
//I'm assuming this is Rx can treat as an observable, like a promise
switchMap(this.realService.getRealStatus()),
//then you have your real status and you need to use a result selector function
switchMap(() =>
legacyService.getState(),
//at this point in the result selector function you have access to both states
(real, legacy) => {
//so you can apply your logic
if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
return new LegacyState(
legacy.instrument,
legacy.db,
DeviceState.UNSTABLE
);
} else { return legacy }
})
);
我意识到我已经无益地改变了真实/遗留顺序,但你明白了要点。
另一种方法是创建一个间隔可观察对象和一个压缩可观察对象,它们仅在真实状态和遗留状态都已发出时才发出
import { zip, interval } from 'rxjs';
import { switchMap, map } from 'rxjs/operators';
const polling = interval(10000);
const states = zip(
legacyService.getState(),
this.realService.getRealStatus()
);
const collectState = polling
.pipe(
switchMap(states),
map(statesArray => {
//do the work here
})
);
);
希望这里有帮助
以@Tom 的方法为灵感,我得以修复它。虽然我直接在遗留服务上而不是在轮询间隔上通过管道 switchMap
进行了细微差别。
legacyService.getState<LegacyState>()
.pipe(
subscribeOn(queue),
switchMap(() =>
this.realService.getRealStatus(), // request to the new BE
(legacy, real) => {
if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
return new LegacyState(
legacy.instrument,
legacy.db,
DeviceState.UNSTABLE,
);
}
return event; // states are same, returning state from legacy source
}
)
.subscribe(event => {
this._deviceStateUpdated$.next(event);
});