RxJS Filtered/Grouped 去抖动
RxJS Filtered/Grouped Debounce
我正在尝试使用 RxJS 去抖动运算符,但我想自定义何时对来自源的发射进行去抖动。
默认情况下,任何 去抖window 内源的发射将导致先前的发射被丢弃。我希望 只有来自源的某些 发射计入去抖动操作,基于源发射的值。
假设我有一个对象的可观察对象,如下所示:
{
priority: 'low' //can be 'low' or 'medium' or 'high
}
我希望去抖按对象的优先级分组。这意味着只有当一个发射具有相同的优先级时,它才会被另一个发射去抖动。
即只有 'low'
排放可以消除 'low'
排放,只有 'high'
排放可以消除 'high'
排放。如果在等待 'low'
发射时出现 'medium'
发射,它不会导致 'low'
发射被丢弃。
这意味着如果我快速连续地进行 'low'
发射和 'medium'
发射,两者都会通过。如果我有两个 'low'
快速连续发射,只有最后一个会通过。
这是我想出的:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000
$source.pipe(
mergeMap(value => {
// We start a race of the value with a delay versus any other emissions from the source with the same priority
return race(
timer(delay).pipe(mapTo(value)),
$source.pipe(
filter(v => v.priority === value.priority),
)
).pipe(
take(1),
// If another emission with the same priority comes before the delay, the second racer it will win the race.
// If no emission with the same priority comes, the first racer will win.
//
// If the first racer wins, this equality check is satisfied and the value is passed through.
// If the second racer wins, the equality check fails and no value is emitted. Since this is a mergeMap, this whole process will start again for that emission.
filter(v => v === value),
)
})
)
我 认为 以上是正确的,但我想知道我是否遗漏了什么或使这种方式变得比需要的更复杂?上面的代码应该像合并三个独立的流 $low.pipe(debounceTime(delay))
$medium.pipe(debounceTime(delay))
和 $high.pipe(debounceTime(delay))
.
一样运行
谢谢!!
我认为你的回答有效。这也很清楚。但是,您必须确保您的 $source
是多播的。
我看到你的方法有一个缺点:
你做了很多额外的计算。如果您每秒去抖动 1000 个值,它可能会明显变慢,具体取决于它所在的位置 运行.
每个流值都可以在任意数量的比赛中。来自不同优先级的输入仍然相互竞争,当下一个值开始竞争时,前一个竞争并没有停止,所以如果有很多值同时到达,你可以得到 timers/races 的爆炸。
设置和删除了很多额外的计时器。在您的情况下,您最多需要三个计时器,每个计时器都会在具有相同优先级的新值到达时重置。
如果您的代码不在关键路径上,那可能不是问题。否则,还有其他方法。不过,我想出的那个在代码方面有点笨重。
对流进行分区
这是我的大脑如何解决这个问题的。我创建了一个运算符来执行 RxJS partition
运算符所做的事情,但允许您划分为两个以上的流。
我的方法在内部处理多播,因此源可以是任何内容(热、冷、多播或非多播)。它(在内部)为每个流设置一个主题,然后您可以像往常一样使用 RxJS 的 debounceTime。
虽然有一个缺点。在您的方法中,您可以随意添加一个新的优先级字符串,它应该会继续工作。 {priority: "DucksSayQuack"} 的对象将相互去抖并且不会影响其他优先级。这甚至可以即时完成。
下面的partitionOn
运算符需要提前知道分区。对于您描述的情况,它应该具有相同的输出并且启动效率更高。
这样更好吗?我不知道,这是解决同一问题的一种有趣且不同的方法。此外,我认为 partitionOn
运算符的用途比分区去抖动更多。
运营商
/***
* Create a partitioned stream for each value where a passed
* predicate returns true
***/
function partitionOn<T>(
input$: Observable<T>,
predicates: ((v:T) => boolean)[]
): Observable<T>[] {
const partitions = predicates.map(predicate => ({
predicate,
stream: new Subject<T>()
}));
input$.subscribe({
next: (v:T) => partitions.forEach(prt => {
if(prt.predicate(v)){
prt.stream.next(v);
}
}),
complete: () => partitions.forEach(prt => prt.stream.complete()),
error: err => partitions.forEach(prt => prt.stream.error(err))
});
return partitions.map(prt => prt.stream.asObservable());
}
使用partitionOn进行优先去抖
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000;
const priorityEquals = a => b => a === b?.priority;
merge(
...partitionOn(
$source,
[priorityEquals('low'),
priorityEquals('medium'),
priorityEquals('high')]
).map(s => s.pipe(
debounceTime(1000)
))
);
为您的流添加时间戳
这种方法与您的方法非常相似,可以让您再次随意使用优先级字符串。这有一个类似的问题,即每个值都被放入计时器中,并且计时器不会在新值到达时被取消。
但是,使用这种方法,取消不必要的计时器的途径就更加清晰了。您可以将订阅对象与时间戳一起存储在 priorityTimeStamp
映射中,并确保在新值到达时取消订阅。
我真的不知道这可能会对性能造成什么影响,我认为 JavaScript 的事件循环很漂亮 robust/efficient。这种方法的好处是您无需支付多播的费用。这实际上只是一个流,使用查找映射来决定过滤什么,不过滤什么。
priorityDebounceTime 运算符
function priorityDebounceTime<T>(
dbTime: number,
priorityStr = "priority"
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
const priorityTimeStamp = new Map<string, number>();
return s.pipe(
mergeMap(v => {
priorityTimeStamp.set(v[priorityStr], Date.now());
return timer(dbTime).pipe(
timestamp(),
filter(({timestamp}) =>
timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
),
mapTo(v)
)
})
)
});
}
使用 priorityDebounceTime 进行优先去抖
这个明显简单了一点:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 5000;
$source.pipe(
priorityDebounceTime(delay)
).subscribe(console.log);
我正在尝试使用 RxJS 去抖动运算符,但我想自定义何时对来自源的发射进行去抖动。
默认情况下,任何 去抖window 内源的发射将导致先前的发射被丢弃。我希望 只有来自源的某些 发射计入去抖动操作,基于源发射的值。
假设我有一个对象的可观察对象,如下所示:
{
priority: 'low' //can be 'low' or 'medium' or 'high
}
我希望去抖按对象的优先级分组。这意味着只有当一个发射具有相同的优先级时,它才会被另一个发射去抖动。
即只有 'low'
排放可以消除 'low'
排放,只有 'high'
排放可以消除 'high'
排放。如果在等待 'low'
发射时出现 'medium'
发射,它不会导致 'low'
发射被丢弃。
这意味着如果我快速连续地进行 'low'
发射和 'medium'
发射,两者都会通过。如果我有两个 'low'
快速连续发射,只有最后一个会通过。
这是我想出的:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000
$source.pipe(
mergeMap(value => {
// We start a race of the value with a delay versus any other emissions from the source with the same priority
return race(
timer(delay).pipe(mapTo(value)),
$source.pipe(
filter(v => v.priority === value.priority),
)
).pipe(
take(1),
// If another emission with the same priority comes before the delay, the second racer it will win the race.
// If no emission with the same priority comes, the first racer will win.
//
// If the first racer wins, this equality check is satisfied and the value is passed through.
// If the second racer wins, the equality check fails and no value is emitted. Since this is a mergeMap, this whole process will start again for that emission.
filter(v => v === value),
)
})
)
我 认为 以上是正确的,但我想知道我是否遗漏了什么或使这种方式变得比需要的更复杂?上面的代码应该像合并三个独立的流 $low.pipe(debounceTime(delay))
$medium.pipe(debounceTime(delay))
和 $high.pipe(debounceTime(delay))
.
谢谢!!
我认为你的回答有效。这也很清楚。但是,您必须确保您的 $source
是多播的。
我看到你的方法有一个缺点:
你做了很多额外的计算。如果您每秒去抖动 1000 个值,它可能会明显变慢,具体取决于它所在的位置 运行.
每个流值都可以在任意数量的比赛中。来自不同优先级的输入仍然相互竞争,当下一个值开始竞争时,前一个竞争并没有停止,所以如果有很多值同时到达,你可以得到 timers/races 的爆炸。
设置和删除了很多额外的计时器。在您的情况下,您最多需要三个计时器,每个计时器都会在具有相同优先级的新值到达时重置。
如果您的代码不在关键路径上,那可能不是问题。否则,还有其他方法。不过,我想出的那个在代码方面有点笨重。
对流进行分区
这是我的大脑如何解决这个问题的。我创建了一个运算符来执行 RxJS partition
运算符所做的事情,但允许您划分为两个以上的流。
我的方法在内部处理多播,因此源可以是任何内容(热、冷、多播或非多播)。它(在内部)为每个流设置一个主题,然后您可以像往常一样使用 RxJS 的 debounceTime。
虽然有一个缺点。在您的方法中,您可以随意添加一个新的优先级字符串,它应该会继续工作。 {priority: "DucksSayQuack"} 的对象将相互去抖并且不会影响其他优先级。这甚至可以即时完成。
下面的partitionOn
运算符需要提前知道分区。对于您描述的情况,它应该具有相同的输出并且启动效率更高。
这样更好吗?我不知道,这是解决同一问题的一种有趣且不同的方法。此外,我认为 partitionOn
运算符的用途比分区去抖动更多。
运营商
/***
* Create a partitioned stream for each value where a passed
* predicate returns true
***/
function partitionOn<T>(
input$: Observable<T>,
predicates: ((v:T) => boolean)[]
): Observable<T>[] {
const partitions = predicates.map(predicate => ({
predicate,
stream: new Subject<T>()
}));
input$.subscribe({
next: (v:T) => partitions.forEach(prt => {
if(prt.predicate(v)){
prt.stream.next(v);
}
}),
complete: () => partitions.forEach(prt => prt.stream.complete()),
error: err => partitions.forEach(prt => prt.stream.error(err))
});
return partitions.map(prt => prt.stream.asObservable());
}
使用partitionOn进行优先去抖
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000;
const priorityEquals = a => b => a === b?.priority;
merge(
...partitionOn(
$source,
[priorityEquals('low'),
priorityEquals('medium'),
priorityEquals('high')]
).map(s => s.pipe(
debounceTime(1000)
))
);
为您的流添加时间戳
这种方法与您的方法非常相似,可以让您再次随意使用优先级字符串。这有一个类似的问题,即每个值都被放入计时器中,并且计时器不会在新值到达时被取消。
但是,使用这种方法,取消不必要的计时器的途径就更加清晰了。您可以将订阅对象与时间戳一起存储在 priorityTimeStamp
映射中,并确保在新值到达时取消订阅。
我真的不知道这可能会对性能造成什么影响,我认为 JavaScript 的事件循环很漂亮 robust/efficient。这种方法的好处是您无需支付多播的费用。这实际上只是一个流,使用查找映射来决定过滤什么,不过滤什么。
priorityDebounceTime 运算符
function priorityDebounceTime<T>(
dbTime: number,
priorityStr = "priority"
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
const priorityTimeStamp = new Map<string, number>();
return s.pipe(
mergeMap(v => {
priorityTimeStamp.set(v[priorityStr], Date.now());
return timer(dbTime).pipe(
timestamp(),
filter(({timestamp}) =>
timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
),
mapTo(v)
)
})
)
});
}
使用 priorityDebounceTime 进行优先去抖
这个明显简单了一点:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 5000;
$source.pipe(
priorityDebounceTime(delay)
).subscribe(console.log);