如果发生其他事件则取消扩展
Cancelling expand if other event occurs
我是 ReactiveX 的新手,一直在尝试使用 RxJS 构建一个“CS:Go 炸弹爆炸通知程序”程序。
到目前为止我有以下内容:
// TimeSubject is the current heartbeat of the game (i.e. current client time),
// it is not emitted every 1 second (it will be emitted at least once every 30
// seconds and also whenever any other game event occurs)
// RoundSubject stores information about the current and previous round state
combineLatest([TimeSubject, RoundSubject])
.pipe(
// detect when the bomb is planted
filter(([_, { current }]) => current.bomb === 'planted'),
// create a timer object to keep track of the time until explosion
map(([time, _]) => ({ plantTime: time, explosionTime: time + 40, currentTime: time })),
// ignore other "bomb planted" events for the next 50 seconds
throttleTime(50 * 1000),
// count down until bomb is exploded
// problem: RoundSubject can emit an event indicating the bomb got defused
// or the round ended before the bomb got to explode,
// but how do I catch that since I am throttling the events from that channel?
expand(({ plantTime, explosionTime, currentTime }) =>
explosionTime > currentTime
? of(({ plantTime, explosionTime, currentTime: currentTime + 1 }))
.pipe(delay(1000))
: EMPTY)
).subscribe(({ plantTime, explosionTime, currentTime }) => {
if (plantTime === currentTime)
console.log('Bomb planted and will explode in 40 seconds!');
else if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
}
});
这里的问题是 RoundSubject
可以发出像 RoundEnded
或 Defused
这样的事件,在任何一种情况下都应该取消计时器。
目前我对可用的运算符了解不多,不知道如何才能很好地解决这个问题。此外,我觉得我的代码与 expand
相当复杂,所以如果您知道更好的方法,请告诉我:-)。
谢谢。
第一:寻求解决方案
这是您可能会做的事情的快速模型:
/**********
* Custom Operator to throttle only specific emissions
*********/
function priorityThrottleTime<T>(
thrTime: number,
priorityStr = "priority"
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
const priorityTimeStamp = new Map<string, number>();
return s.pipe(
filter(v => Date.now() - (
priorityTimeStamp.get(v[priorityStr]) ||
0) >= thrTime
),
tap(v => {
if(v[priorityStr] != null){
priorityTimeStamp.set(
v[priorityStr],
Date.now()
)
}
})
);
});
}
// TimeSubject is the current heartbeat of the game (i.e. current client time)
// RoundSubject stores information about the current and previous round state
roundSubject.pipe(
// detect when the bomb is planted, map to priority: 1,
// otherwise map without priority
map(round =>
round.current.bomb === 'planted' ?
({priority: 1, payload: round}) :
({payload: round})
),
// same prioroty events ("bomb planted" events)
// ignored for the next 50 seconds
priorityThrottleTime(50 * 1000),
// Throttling is done, get our payload back
map(({payload}) => payload),
// create a new observable depending on what the round is doing
switchMap(({current}) =>
current.bomb !== 'planted' ?
EMPTY :
timeSubject.pipe(
// Grab the next heartbeat
take(1),
// create a timer object to keep track of the time until explosion
map(time => ({
plantTime: time,
explosionTime: time + 40,
currentTime: time
})),
// count down until bomb is exploded
expand(({ plantTime, explosionTime, currentTime }) =>
currentTime > explosionTime ?
EMPTY :
of(({
plantTime,
explosionTime,
currentTime: currentTime + 1
})).pipe(delay(1000))
)
)
)
).subscribe(({ plantTime, explosionTime, currentTime }) => {
if (plantTime === currentTime)
console.log('Bomb planted and will explode in 40 seconds!');
else if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
}
});
一些解释
'I want to cancel an ongoing observable based on some upstream event' 模式应始终指向 switchMap
在上面的代码片段中,任何未被限制的事件要么启动新炸弹,要么什么都不做。无论哪种方式,switchMap 都会取消任何正在进行的炸弹(如果有的话)。
您可能会发现有很大的空间可以改变这种行为,但我不确定您想要什么,所以我把它留给您。
优先节流时间
priorityThrottleTime 对于您的需求来说实在是太过分了,但我很久以前就写了它,没有时间进行简化。您可以重写它以采用谓词,并且仅在谓词 returns 为真时进行限制。这样你就可以避免映射进出你上面看到的 ({priority, payload})
对象的麻烦。
展开
Expand 是一个非常臃肿的计时器工具。这不是真正的问题,但我可以通过计算您已经维护的 timeSubject
的 40 秒 oof 来简化它。
这样你就不需要检查每个迭代。
timeSubject.pipe(
// first emissions is time 0,
// then take 40 seconds, then stop
take(41),
// create the timer object
map((currentTime, timePassed) => ({
plantTime: currentTime - timePassed,
explosionTime: 40 - timePassed,
currentTime
}))
)
这确实假设你的 timeSubject 每秒都在抽动,否则你可以像这样改变:
timeSubject.pipe(
take(1),
// create a timer that tics every second
switchMap(plantTime => timer(0, 1000).pipe(
// take 40 seconds, then stop
take(41),
// create a timer object
map(timePassed => ({
plantTime,
explosionTime: plantTime + 40,
currentTime: plantTime + timePassed,
}))
))
)
更新
好的,这是一种新的节流方式,虽然我还没有真正测试过它。它会大大简化你的代码,所以也许值得一试。
function throttleTimeOn<T>(
thrTime: number,
pred: (x:T) => boolean
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
let throttleTimeStamp = 0;
return s.pipe(
filter(v => {
const isThrot = pred(v);
if(!isThrot) return true;
else return Date.now() -
throttleTimeStamp >= thrTime;
}),
tap(v => { if(pred(v)) {
throttleTimeStamp = Date.now();
}})
);
});
}
这里正在使用中:
roundSubject.pipe(
// events that meet predicate ("bomb planted" events)
// ignored for the next 50 seconds
throttleTimeOn(
50 * 1000,
({current}) => current.bomb === 'planted'
),
// create a new observable depending on what the round is doing
switchMap(({current}) =>
current.bomb !== 'planted' ?
EMPTY :
timeSubject.pipe(
// first emissions is time 0,
// then take 40 seconds, then stop
take(41),
// create the timer object
map((currentTime, timePassed) => ({
plantTime: currentTime - timePassed,
explosionTime: 40 - timePassed,
currentTime
}))
)
)
).subscribe(({ plantTime, explosionTime, currentTime }) => {
if (plantTime === currentTime)
console.log('Bomb planted and will explode in 40 seconds!');
else if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
}
});
如果你想捕捉 RoundEnded
或 Defused
事件,你可以做的是创建一个新的流来监听这些事件并在需要时取消计时器。
let cancelStream$ = RoundedSubject.pipe(
filter(({current}) => current.bomb === 'RoundEnded' || current.bomb === 'Defused'),
)
// TimeSubject is the current heartbeat of the game (i.e. current client time)
// RoundSubject stores information about the current and previous round state
combineLatest([TimeSubject, RoundSubject])
.pipe(
takeUntil(cancelStream$),
// detect when the bomb is planted
filter(([_, { current }]) => current.bomb === 'planted'),
// create a timer object to keep track of the time until explosion
map(([time, _]) => ({ plantTime: time, explosionTime: time + 40, currentTime: time })),
// ignore other "bomb planted" events for the next 50 seconds
throttleTime(50 * 1000),
// count down until bomb is exploded
// problem: RoundSubject can emit an event indicating the bomb got defused
// or the round ended before the bomb got to explode,
// but how do I catch that since I am throttling the events from that channel?
expand(({ plantTime, explosionTime, currentTime }) =>
explosionTime > currentTime
? of(({ plantTime, explosionTime, currentTime: currentTime + 1 }))
.pipe(delay(1000))
: EMPTY)
).subscribe(({ plantTime, explosionTime, currentTime }) => {
if (plantTime === currentTime)
console.log('Bomb planted and will explode in 40 seconds!');
else if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
}
});
如有疑问,命名事物!
到目前为止用
RxJS 在单个管道中使用 5 个以上的运算符编写可观察对象。很容易丢剧情
不要害怕创建多个命名流;事情会读得更多
顺其自然。
// This creates a stream that emits every time the bomb's status changes to the
// provided value.
const bombChangedStatusTo = (status) =>
RoundSubject.pipe(
pluck('current'),
distinctUntilKeyChanged('bomb'),
filter((bombStatus) => bombStatus === status)
);
const bombPlanted$ = bombChangedStatusTo('planted');
const bombDefused$ = bombChangedStatusTo('defused');
另一个答案是正确的,expand
在这里有点过分了。假设我们知道开始时间,倒计时可以像映射某个间隔发出的值一样简单(请参阅最后一节,了解为什么我们实际上不需要 plantTime
)。
// we use share() since we'll subscribe to this more than once, it
// ensures that we're subscribing to the exact same interval each time
const clockInterval$ = interval(1000).pipe(
startWith(null), // emit immediately instead of after 1s
map(() => Math.floor(Date.now()/1000)),
share()
);
const countDown = (startTime) =>
clockInterval$.pipe(
map((currentTime) => ({
explosionTime: startTime + 40,
currentTime
})),
takeWhile(
({ currentTime, explosionTime }) => currentTime < explosionTime,
true // include the emission that triggered completion
)
);
这里我们使用exhaustMap
来确保每个“炸弹”只有一个定时器运行
种植”事件(见
docs)。无需
使用 throttleTime
,这将为我们提供两个计数到 40 的计时器,而不仅仅是
一.
const bombClock$ = bombPlanted$.pipe(
withLatestFrom(clockInterval$), // <-- reusing the shared clock
exhaustMap(([_, plantTime]) =>
countDown(plantTime).pipe(
takeUntil(bombDefused$) // stop the timer if the bomb is defused
)
)
);
如果我们使用 bombPlanted$
触发“炸弹被放置”的副作用,我们不会
不再需要在 bombClock$
值
上将 plantTime
作为 属性 传递
bombPlanted$.subscribe(() => {
console.log('Bomb planted and will explode in 40 seconds!');
});
bombClock$.subscribe(({ explosionTime, currentTime }) => {
if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
} else {
console.log('The bomb has exploded');
}
});
我是 ReactiveX 的新手,一直在尝试使用 RxJS 构建一个“CS:Go 炸弹爆炸通知程序”程序。
到目前为止我有以下内容:
// TimeSubject is the current heartbeat of the game (i.e. current client time),
// it is not emitted every 1 second (it will be emitted at least once every 30
// seconds and also whenever any other game event occurs)
// RoundSubject stores information about the current and previous round state
combineLatest([TimeSubject, RoundSubject])
.pipe(
// detect when the bomb is planted
filter(([_, { current }]) => current.bomb === 'planted'),
// create a timer object to keep track of the time until explosion
map(([time, _]) => ({ plantTime: time, explosionTime: time + 40, currentTime: time })),
// ignore other "bomb planted" events for the next 50 seconds
throttleTime(50 * 1000),
// count down until bomb is exploded
// problem: RoundSubject can emit an event indicating the bomb got defused
// or the round ended before the bomb got to explode,
// but how do I catch that since I am throttling the events from that channel?
expand(({ plantTime, explosionTime, currentTime }) =>
explosionTime > currentTime
? of(({ plantTime, explosionTime, currentTime: currentTime + 1 }))
.pipe(delay(1000))
: EMPTY)
).subscribe(({ plantTime, explosionTime, currentTime }) => {
if (plantTime === currentTime)
console.log('Bomb planted and will explode in 40 seconds!');
else if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
}
});
这里的问题是 RoundSubject
可以发出像 RoundEnded
或 Defused
这样的事件,在任何一种情况下都应该取消计时器。
目前我对可用的运算符了解不多,不知道如何才能很好地解决这个问题。此外,我觉得我的代码与 expand
相当复杂,所以如果您知道更好的方法,请告诉我:-)。
谢谢。
第一:寻求解决方案
这是您可能会做的事情的快速模型:
/**********
* Custom Operator to throttle only specific emissions
*********/
function priorityThrottleTime<T>(
thrTime: number,
priorityStr = "priority"
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
const priorityTimeStamp = new Map<string, number>();
return s.pipe(
filter(v => Date.now() - (
priorityTimeStamp.get(v[priorityStr]) ||
0) >= thrTime
),
tap(v => {
if(v[priorityStr] != null){
priorityTimeStamp.set(
v[priorityStr],
Date.now()
)
}
})
);
});
}
// TimeSubject is the current heartbeat of the game (i.e. current client time)
// RoundSubject stores information about the current and previous round state
roundSubject.pipe(
// detect when the bomb is planted, map to priority: 1,
// otherwise map without priority
map(round =>
round.current.bomb === 'planted' ?
({priority: 1, payload: round}) :
({payload: round})
),
// same prioroty events ("bomb planted" events)
// ignored for the next 50 seconds
priorityThrottleTime(50 * 1000),
// Throttling is done, get our payload back
map(({payload}) => payload),
// create a new observable depending on what the round is doing
switchMap(({current}) =>
current.bomb !== 'planted' ?
EMPTY :
timeSubject.pipe(
// Grab the next heartbeat
take(1),
// create a timer object to keep track of the time until explosion
map(time => ({
plantTime: time,
explosionTime: time + 40,
currentTime: time
})),
// count down until bomb is exploded
expand(({ plantTime, explosionTime, currentTime }) =>
currentTime > explosionTime ?
EMPTY :
of(({
plantTime,
explosionTime,
currentTime: currentTime + 1
})).pipe(delay(1000))
)
)
)
).subscribe(({ plantTime, explosionTime, currentTime }) => {
if (plantTime === currentTime)
console.log('Bomb planted and will explode in 40 seconds!');
else if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
}
});
一些解释
'I want to cancel an ongoing observable based on some upstream event' 模式应始终指向 switchMap
在上面的代码片段中,任何未被限制的事件要么启动新炸弹,要么什么都不做。无论哪种方式,switchMap 都会取消任何正在进行的炸弹(如果有的话)。
您可能会发现有很大的空间可以改变这种行为,但我不确定您想要什么,所以我把它留给您。
优先节流时间
priorityThrottleTime 对于您的需求来说实在是太过分了,但我很久以前就写了它,没有时间进行简化。您可以重写它以采用谓词,并且仅在谓词 returns 为真时进行限制。这样你就可以避免映射进出你上面看到的 ({priority, payload})
对象的麻烦。
展开
Expand 是一个非常臃肿的计时器工具。这不是真正的问题,但我可以通过计算您已经维护的 timeSubject
的 40 秒 oof 来简化它。
这样你就不需要检查每个迭代。
timeSubject.pipe(
// first emissions is time 0,
// then take 40 seconds, then stop
take(41),
// create the timer object
map((currentTime, timePassed) => ({
plantTime: currentTime - timePassed,
explosionTime: 40 - timePassed,
currentTime
}))
)
这确实假设你的 timeSubject 每秒都在抽动,否则你可以像这样改变:
timeSubject.pipe(
take(1),
// create a timer that tics every second
switchMap(plantTime => timer(0, 1000).pipe(
// take 40 seconds, then stop
take(41),
// create a timer object
map(timePassed => ({
plantTime,
explosionTime: plantTime + 40,
currentTime: plantTime + timePassed,
}))
))
)
更新
好的,这是一种新的节流方式,虽然我还没有真正测试过它。它会大大简化你的代码,所以也许值得一试。
function throttleTimeOn<T>(
thrTime: number,
pred: (x:T) => boolean
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
let throttleTimeStamp = 0;
return s.pipe(
filter(v => {
const isThrot = pred(v);
if(!isThrot) return true;
else return Date.now() -
throttleTimeStamp >= thrTime;
}),
tap(v => { if(pred(v)) {
throttleTimeStamp = Date.now();
}})
);
});
}
这里正在使用中:
roundSubject.pipe(
// events that meet predicate ("bomb planted" events)
// ignored for the next 50 seconds
throttleTimeOn(
50 * 1000,
({current}) => current.bomb === 'planted'
),
// create a new observable depending on what the round is doing
switchMap(({current}) =>
current.bomb !== 'planted' ?
EMPTY :
timeSubject.pipe(
// first emissions is time 0,
// then take 40 seconds, then stop
take(41),
// create the timer object
map((currentTime, timePassed) => ({
plantTime: currentTime - timePassed,
explosionTime: 40 - timePassed,
currentTime
}))
)
)
).subscribe(({ plantTime, explosionTime, currentTime }) => {
if (plantTime === currentTime)
console.log('Bomb planted and will explode in 40 seconds!');
else if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
}
});
如果你想捕捉 RoundEnded
或 Defused
事件,你可以做的是创建一个新的流来监听这些事件并在需要时取消计时器。
let cancelStream$ = RoundedSubject.pipe(
filter(({current}) => current.bomb === 'RoundEnded' || current.bomb === 'Defused'),
)
// TimeSubject is the current heartbeat of the game (i.e. current client time)
// RoundSubject stores information about the current and previous round state
combineLatest([TimeSubject, RoundSubject])
.pipe(
takeUntil(cancelStream$),
// detect when the bomb is planted
filter(([_, { current }]) => current.bomb === 'planted'),
// create a timer object to keep track of the time until explosion
map(([time, _]) => ({ plantTime: time, explosionTime: time + 40, currentTime: time })),
// ignore other "bomb planted" events for the next 50 seconds
throttleTime(50 * 1000),
// count down until bomb is exploded
// problem: RoundSubject can emit an event indicating the bomb got defused
// or the round ended before the bomb got to explode,
// but how do I catch that since I am throttling the events from that channel?
expand(({ plantTime, explosionTime, currentTime }) =>
explosionTime > currentTime
? of(({ plantTime, explosionTime, currentTime: currentTime + 1 }))
.pipe(delay(1000))
: EMPTY)
).subscribe(({ plantTime, explosionTime, currentTime }) => {
if (plantTime === currentTime)
console.log('Bomb planted and will explode in 40 seconds!');
else if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
}
});
如有疑问,命名事物!
到目前为止用 RxJS 在单个管道中使用 5 个以上的运算符编写可观察对象。很容易丢剧情
不要害怕创建多个命名流;事情会读得更多 顺其自然。
// This creates a stream that emits every time the bomb's status changes to the
// provided value.
const bombChangedStatusTo = (status) =>
RoundSubject.pipe(
pluck('current'),
distinctUntilKeyChanged('bomb'),
filter((bombStatus) => bombStatus === status)
);
const bombPlanted$ = bombChangedStatusTo('planted');
const bombDefused$ = bombChangedStatusTo('defused');
另一个答案是正确的,expand
在这里有点过分了。假设我们知道开始时间,倒计时可以像映射某个间隔发出的值一样简单(请参阅最后一节,了解为什么我们实际上不需要 plantTime
)。
// we use share() since we'll subscribe to this more than once, it
// ensures that we're subscribing to the exact same interval each time
const clockInterval$ = interval(1000).pipe(
startWith(null), // emit immediately instead of after 1s
map(() => Math.floor(Date.now()/1000)),
share()
);
const countDown = (startTime) =>
clockInterval$.pipe(
map((currentTime) => ({
explosionTime: startTime + 40,
currentTime
})),
takeWhile(
({ currentTime, explosionTime }) => currentTime < explosionTime,
true // include the emission that triggered completion
)
);
这里我们使用exhaustMap
来确保每个“炸弹”只有一个定时器运行
种植”事件(见
docs)。无需
使用 throttleTime
,这将为我们提供两个计数到 40 的计时器,而不仅仅是
一.
const bombClock$ = bombPlanted$.pipe(
withLatestFrom(clockInterval$), // <-- reusing the shared clock
exhaustMap(([_, plantTime]) =>
countDown(plantTime).pipe(
takeUntil(bombDefused$) // stop the timer if the bomb is defused
)
)
);
如果我们使用 bombPlanted$
触发“炸弹被放置”的副作用,我们不会
不再需要在 bombClock$
值
plantTime
作为 属性 传递
bombPlanted$.subscribe(() => {
console.log('Bomb planted and will explode in 40 seconds!');
});
bombClock$.subscribe(({ explosionTime, currentTime }) => {
if (explosionTime >= currentTime) {
const secondsToExplode = explosionTime - currentTime;
console.log(`.. explodes in: ${secondsToExplode} seconds`);
} else {
console.log('The bomb has exploded');
}
});