RxJS 限制相同的值但让新值通过

RxJS throttle same value but let new values through

"Here you have",有人说,你得到了这个输入值流,你有点想在...

上执行 distinctUntilChanged()
Input:  '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'

到目前为止没有什么奇怪的,
但是现在有人说 "it's okey" 如果相同的值再次出现,"but only if it's not to soon!"。我想要至少 '----' 相同值之间的滴答声。 "Okey"你说你加个油门

const source = new Subject<number>();

// mysterious cave troll is randomly source.next(oneOrTwo)

const example = source.pipe(throttle(val => interval(4000)));

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'

"That's not what I want! Look at all the value you missed",指的是您对流式传输的所有值进行了限制。

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
        '-------------->1<--------->2<----->1<------|' <-- Missed values

"Here, let me show show you"神秘人说给你这个

想要输出

Input:  '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'

我对此的回答是,感觉合并 window 不行。

来自更有经验的人,
这是一个很难解决的问题吗?(或者我错过了一个明显的解决方案)

我没想到,您想按时间间隔进行缓冲,然后在每个缓冲区中区分。

实际上你想每 n 毫秒重新启动/重新启动不同的 运行。

source.pipe(
  bufferTime(ms),
  mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)

这是我的第二次尝试,它通过输出过滤流(而不是采用 distinctUntil),然后限制并合并两个流。

当然,我们可能没有一组已知的值 (1,2,...n)。
如果我能弄清楚这个问题,我会再举一个例子。

const output = merge(
  source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
  source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)

这是我的支票 (ms = 4000)

input         1-1----11---2--1112----1---2---2-2-1-2-----
expected      1------1----2--1--2----1---2-----2-1-------

filter(1)     1-1----11------111-----1-----------1-------
throttle(1)   1------1-------1-------1-----------1-------

filter(2)     ------------2-----2--------2---2-2---2-----
throttle(2)   ------------2-----2--------2-----2---------

merged        1------1----2--1--2----1---2-----2-1-------
expected      1------1----2--1--2----1---2-----2-1-------

扩展到 n 个值

我认为这适用于事先不知道流中的值集的情况(或者范围很大,因此扩展以前的答案是不切实际的)。

只要源代码完成,它就应该可以工作。

merge(
  source.pipe(
    distinct().pipe(
      mapTo(distinctVal => source.pipe( 
        filter(val = val === distinctVal), 
        throttle(val => interval(ms))
      )
    )  
  )
)

我还没有证据,post 下一个。

首先我想出了以某种方式组合 distinctUntilChanged()throttleTimte() 的想法,但是我无法想出解决方案,然后我尝试了其他方法。

我想出的运算符是 throttleDistinct(),可以按您的意愿工作:StackBlit Editor Link

它有 2 个参数,它们是:

  1. duration: number 以毫秒为单位,类似于 持续时间 throttleTime(duration: number)
  2. equals: (a: T, b: T) => boolean 比较上一项是否等于下一项的函数,有默认值 (a, b) => a === b
  3. 的实施

import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';

const source = fromEvent(document, 'keypress')
  .pipe(map((x: any) => x.keyCode as number))

source
  .pipe(
    throttleDistinct(1000),
  )
  .subscribe((x) => console.log('__subscribe__', x));

export function throttleDistinct<T>(
  duration: number,
  equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
  return (source: Observable<T>) => {
    return source
      .pipe(
        map((x) => {
          const obj = { val: x, time: Date.now(), keep: true };
          return obj;
        }),
        scan((acc, cur) => {
          const diff = cur.time - acc.time;

          const isSame = equals(acc.val, cur.val)
          return diff > duration || (diff < duration && !isSame)
            ? { ...cur, keep: true }
            : { ...acc, keep: false };
        }),
        filter((x) => x.keep),
        map((x) => x.val),
      )
  }
}

这是一个基于算子理论的棘手解决方案,但我不确定它是否真的有效,因为我需要先模拟源发射。

所以 throttle 和不同的流总是缓存最新的值,zip 确保它们总是成对发射,zip 总是在任何流发射时发射,因为它是 shareReplay(1)。

我们总是从 distinctStream 中获取值,即使 zip 流被 throttle 触发,因为 distinctStream 总是有最后缓存的值。

const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
   map((t,d)=>d)
)

我找到了一个有效的解决方案,有人对此有任何看法吗?

source.pipe(
   windowTime(4000),
   concatMap(obs => obs.pipe(distinct()))
);

之前的示例,在 StackBlitz example

更新: 这实际上并不是 100% 有效。它只考虑当前的 window。所以你可以有

`[1-12][2---]` which would give `1--22---|`

其中 [----] 代表时间 window。换句话说,如果一个值首先在一个 window 中最后发出,并在下一个 window 中首先发出,则相同的值将紧接着彼此传递。

感谢@eric99 让我意识到这一点。