使用来自另一个可观察值的值过滤和去抖一个可观察值

Filter and debounce an observable using values from another observable

我有 2 个可观察对象:

我想做的是:

我看到 withLatestFrom(inspector).filter(...) 解决了一些类似的问题,但它对我不起作用,因为我需要在去抖时间期间从 inspector observable 发出的所有值。

我也尝试了 'merge' 运算符,但我只关心来源:如果检查员发出值但来源没有,我正在尝试构建的可观察对象也不应该。

有没有办法只使用可观察对象来实现这一点?

这可以通过使用 buffer 运算符来解决,该运算符仅在要求的时间间隔过去后发出通知,以防阻塞流未提前取消。

source$.pipe(
  buffer(source$.pipe(
    exhaustMap(() => timer(10).pipe(
      takeUntil(blocker$)
    ))
  ))
);

const {timer} = rxjs;
const {buffer, exhaustMap, takeUntil} = rxjs.operators;
const {TestScheduler} = rxjs.testing;
const {expect} = chai;

const test = (testName, testFn) => {
  try {
    testFn();
    console.log(`Test PASS "${testName}"`);
  } catch (error) {
    console.error(`Test FAIL "${testName}"`, error.message);
  }
}

const createTestScheduler = () => new TestScheduler((actual, expected) => {
  expect(actual).deep.equal(expected);
});

const createTestStream = (source$, blocker$) => {
  return source$.pipe(
    buffer(source$.pipe(
      exhaustMap(() => timer(10).pipe(
        takeUntil(blocker$)
      ))
    ))
  );
}

const testStream = ({ marbles, values}) => {
  const testScheduler = createTestScheduler();
  testScheduler.run((helpers) => {
    const { cold, hot, expectObservable } = helpers;
    const source$ = hot(marbles.source);
    const blocker$ = hot(marbles.blocker);
    const result$ = createTestStream(source$, blocker$)
    expectObservable(result$).toBe(marbles.result, values.result);
  });
}

test('should buffer changes with 10ms delay', () => {
  testStream({
    marbles: {
      source: ' ^-a-b 7ms ---c 9ms -----|   ',
      blocker: '^-   10ms --- 10ms -----|   ',
      result: ' --   10ms i-- 10ms j----(k|)',
    },
    values: {
      result: {
        i: ['a', 'b'],
        j: ['c'],
        k: [],
      },
    }
  });
});

test('should block buffer in progress and move values to next one', () => {
  testStream({
    marbles: {
      source: ' ^-a-b 7ms ---c 9ms -----|   ',
      blocker: '^-  8ms e---- 10ms -----|   ',
      result: ' --   10ms --- 10ms j----(k|)',
    },
    values: {
      result: {
        j: ['a', 'b', 'c'],
        k: [],
      },
    }
  });
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/chai/4.1.2/chai.js"></script>
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

分解这个问题很有帮助。我不太明白你在问什么,所以这是我最好的印象:

  • 源发出一个值。
  • 在初始发射之后,我们开始听取检查员的真实值。
  • 一旦 debounce 时间过去,如果检查器只发出 true,我们就会发出一个值。

我想做的第一个观察(请原谅双关语)是您不必使用 debounceTime 来获得类似去抖动的效果。我发现 switchMap 里面有一个 timer 可以产生相同的结果: SwitchMap 会取消之前的debouncetimer 等发射可以延迟发射。

我的建议是从源代码中使用 switchMap,然后结合 timer 和检查器创建一个可观察对象。使用 filter 运算符,以便仅在检查器最后一次发出的结果持续时间计时器的持续时间为 true.[=12= 时才从源发出]

this.source.pipe(
  switchMap(x => // switchMap will cancel any emissions if the timer hasn't emitted yet.
    // this combineLatest will only emit once - when the timer emits.
    combineLatest([
        timer(60000), 
        this.inspector.pipe(filter(x => !x), startWith(true))
    ]).pipe( 
      filter(([_, shouldEmit]) => shouldEmit), 
      mapTo(x) // emit source's value
    )
  )
)
  • 注意:startWith用于inspector的管道中,因此至少会发出一个结果。这保证一旦 timer 发出,就会有一次发射。过滤器在检查器上,因为您只关心错误的结果是否会阻止发射。
  • 如果您不想强迫用户等待一分钟,您可以使用 race 而不是 combineLatest。它将发出第一个发出的 observable 的结果。因此,您可以让计时器在一分钟后发出 true,而检查器发出任何 false 结果。
switchMap(x =>
  race(
    timer(6000).pipe(mapTo(true)), // emit true after a minute.
    this.inspector.pipe(filter(x => !x)) // only emit false
  ).pipe(
    take(1), // this might not be necessary.
    filter((shouldEmit) => shouldEmit), 
    mapTo(x) // emit source's value
  )
)