RxJS Filtered/Grouped 去抖动

RxJS Filtered/Grouped Debounce

我正在尝试使用 RxJS 去抖动运算符,但我想自定义何时对来自源的发射进行去抖动。

默认情况下,任何 去抖window 内源的发射将导致先前的发射被丢弃。我希望 只有来自源的某些 发射计入去抖动操作,基于源发射的值。

假设我有一个对象的可观察对象,如下所示:

{
  priority: 'low'    //can be 'low' or 'medium' or 'high
}

我希望去抖按对象的优先级分组。这意味着只有当一个发射具有相同的优先级时,它才会被另一个发射去抖动。

即只有 'low' 排放可以消除 'low' 排放,只有 'high' 排放可以消除 'high' 排放。如果在等待 'low' 发射时出现 'medium' 发射,它不会导致 'low' 发射被丢弃。

这意味着如果我快速连续地进行 'low' 发射和 'medium' 发射,两者都会通过。如果我有两个 'low' 快速连续发射,只有最后一个会通过。

这是我想出的:

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000

$source.pipe(
    mergeMap(value => {
       
        // We start a race of the value with a delay versus any other emissions from the source with the same priority
        return race(
            timer(delay).pipe(mapTo(value)),
            $source.pipe(
                filter(v => v.priority === value.priority),
            )
        ).pipe(
            take(1),
            // If another emission with the same priority comes before the delay, the second racer it will win the race.
            // If no emission with the same priority comes, the first racer will win.
            //
            // If the first racer wins, this equality check is satisfied and the value is passed through.
            // If the second racer wins, the equality check fails and no value is emitted. Since this is a mergeMap, this whole process will start again for that emission.
            filter(v => v === value),
        )
    })
)

认为 以上是正确的,但我想知道我是否遗漏了什么或使这种方式变得比需要的更复杂?上面的代码应该像合并三个独立的流 $low.pipe(debounceTime(delay)) $medium.pipe(debounceTime(delay))$high.pipe(debounceTime(delay)).

一样运行

谢谢!!

我认为你的回答有效。这也很清楚。但是,您必须确保您的 $source 是多播的。

我看到你的方法有一个缺点:

你做了很多额外的计算。如果您每秒去抖动 1000 个值,它可能会明显变慢,具体取决于它所在的位置 运行.

每个流值都可以在任意数量的比赛中。来自不同优先级的输入仍然相互竞争,当下一个值开始竞争时,前一个竞争并没有停止,所以如果有很多值同时到达,你可以得到 timers/races 的爆炸。

设置和删除了很多额外的计时器。在您的情况下,您最多需要三个计时器,每个计时器都会在具有相同优先级的新值到达时重置。

如果您的代码不在关键路径上,那可能不是问题。否则,还有其他方法。不过,我想出的那个在代码方面有点笨重。

对流进行分区

这是我的大脑如何解决这个问题的。我创建了一个运算符来执行 RxJS partition 运算符所做的事情,但允许您划分为两个以上的流。

我的方法在内部处理多播,因此源可以是任何内容(热、冷、多播或非多播)。它(在内部)为每个流设置一个主题,然后您可以像往常一样使用 RxJS 的 debounceTime。

虽然有一个缺点。在您的方法中,您可以随意添加一个新的优先级字符串,它应该会继续工作。 {priority: "DucksSayQuack"} 的对象将相互去抖并且不会影响其他优先级。这甚至可以即时完成。

下面的partitionOn运算符需要提前知道分区。对于您描述的情况,它应该具有相同的输出并且启动效率更高。

这样更好吗?我不知道,这是解决同一问题的一种有趣且不同的方法。此外,我认为 partitionOn 运算符的用途比分区去抖动更多。

运营商

/***
 * Create a partitioned stream for each value where a passed 
 * predicate returns true
 ***/
function partitionOn<T>(
  input$: Observable<T>, 
  predicates: ((v:T) => boolean)[]
): Observable<T>[] {
  const partitions = predicates.map(predicate => ({
    predicate,
    stream: new Subject<T>()
  }));

  input$.subscribe({
    next: (v:T) => partitions.forEach(prt => {
      if(prt.predicate(v)){
        prt.stream.next(v);
      } 
    }),
    complete: () => partitions.forEach(prt => prt.stream.complete()),
    error: err => partitions.forEach(prt => prt.stream.error(err))
  });

  return partitions.map(prt => prt.stream.asObservable());
}

使用partitionOn进行优先去抖

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000;

const priorityEquals = a => b => a === b?.priority;

merge(
  ...partitionOn(
    $source,
    [priorityEquals('low'),
    priorityEquals('medium'),
    priorityEquals('high')]
  ).map(s => s.pipe(
    debounceTime(1000)
  ))
);

为您的流添加时间戳

这种方法与您的方法非常相似,可以让您再次随意使用优先级字符串。这有一个类似的问题,即每个值都被放入计时器中,并且计时器不会在新值到达时被取消。

但是,使用这种方法,取消不必要的计时器的途径就更加清晰了。您可以将订阅对象与时间戳一起存储在 priorityTimeStamp 映射中,并确保在新值到达时取消订阅。

我真的不知道这可能会对性能造成什么影响,我认为 JavaScript 的事件循环很漂亮 robust/efficient。这种方法的好处是您无需支付多播的费用。这实际上只是一个流,使用查找映射来决定过滤什么,不过滤什么。

priorityDebounceTime 运算符

function priorityDebounceTime<T>(
  dbTime: number, 
  priorityStr = "priority"
): MonoTypeOperatorFunction<T> {

  return s => defer(() => {
    const priorityTimeStamp = new Map<string, number>();
    return s.pipe(
      mergeMap(v => {
        priorityTimeStamp.set(v[priorityStr], Date.now());
        return timer(dbTime).pipe(
          timestamp(),
          filter(({timestamp}) => 
            timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
          ),
          mapTo(v)
        )
      })
    )
  });

}

使用 priorityDebounceTime 进行优先去抖

这个明显简单了一点:

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 5000;

$source.pipe(
  priorityDebounceTime(delay)
).subscribe(console.log);