如何通过 RxJS 6 制作 SR Latch?

How to make SR Latch via RxJS 6?

我想通过 RxJS 制作 SR 锁存器。它应该采用 2 个流(#1 和 #2)并且仅当 #1 发出某些东西时才发出。

然后它应该忽略#1 并听#2 直到它发出一些东西。然后它应该"reset"(编辑:意味着停止发射可观察量,但保持订阅)并开始收听#1.

我做了一个快速的 jsfiddle:https://jsfiddle.net/fczjusqn/11/ 有一个错误 - 如果你按几次开始,它会让间隔开始几次。

这是在我的真实应用程序中使用有点复杂的设置逻辑模拟可暂停进程。

必填代码:

// const start$ = ...
// const stop$ = ...

start$.pipe(
  flatMap(() => {
    // some setup logic...
    // ...creating another stream...
    // ...still setting up...
    return Rx.interval(1000)
      .pipe(
        takeUntil(stop$)
      )
    })    
  ).subscribe(() => console.log('interval'))

我建议创建一个 input$ observable,它只触发从开始到停止或停止到开始的变化值:

const start$ = Rx.fromEvent(document.querySelector('#start'), 'click').pipe(op.mapTo(true));

const stop$ = Rx.fromEvent(document.querySelector('#stop'), 'click').pipe(op.mapTo(false));

const input$ = Rx.merge(start$, stop$).pipe(op.distinctUntilChanged());

这样,当您多次单击开始或停止时,它将被忽略。使用这个 observable,您可以简单地使用您已经使用过的流。

Here is a fork of your example with my suggestion.