如何平滑流式传输 Rxjs?

How to smooth stream Rxjs?

我有一个 Rxjs:

   private loading = new BehaviorSubject(false);

loading可以从true快速更改为false。如何平滑此流并仅在 falsetrue 之间的时间超过 3 秒时传递值,否则默认传递 false?

您可以使用 debounceTime() 让您的流在发出值之前等待给定的持续时间。 (https://www.learnrxjs.io/learn-rxjs/operators/filtering/debouncetime)

要仍然直接发出默认值,当您订阅流时,您必须使用startWith()在(去抖动的)流前加上默认值(https://www.learnrxjs.io/learn-rxjs/operators/combination/startwith)

const $loadingState = loading
  .pipe(
    debounceTime(5 * 1000),
    startWith(false)
  );

你好梅根,你可以看看下面的演示,我认为它会帮助你解决你面临的问题。 (我使用的是 1500 毫秒而不是 3000 毫秒,但这并不重要)

所以问题的第一部分是您希望能够以特定时间间隔获取有关加载状态(存储在 BehavourSubject 中的值)的过去状态的信息。

这是通过 buffer 运算符和 interval 可观察值的组合实现的。这个想法是 buffer 将记录所有通过流发出的事件,并在作为参数传递的可观察对象发出值时传递它们(在我们的例子中,每 1500 毫秒 interval 可观察对象将发出新值)

因此每 1500 毫秒我们将从加载主题中获取新值。

接下来我们需要处理的是保证主题在过去的 1500 毫秒内没有变化,我实现的方法是使用 scan 运算符来存储最后发出的值来自主题。

在后续 filter 运算符中使用这些汇总数据,我正在检查主题中是否写入了任何新值(如果有的话,它们可以通过 newEvents 变量,它将是最后缓冲的主题值的数组)

所以过滤条件基本上是说如果在过去的 1500 毫秒内没有事件,则应该传递最新的加载值。

在管道的末端distinctUntilChanged用于过滤掉任何重复的加载状态,所以只有当新的load/loaded状态存在时,HTML中的值是改变了。

let {
  BehaviorSubject,
  interval
} = rxjs
let {
  buffer,
  scan,
  map,
  filter,
  distinctUntilChanged
} = rxjs.operators

let someFlagToSaveTheCurrentState = false;
let currentState = document.getElementById('currentState')

let loading = document.getElementById('loading')
let loadingBtn = document.getElementById('loadingbtn')

let loader = new BehaviorSubject(false)

loadingBtn.addEventListener('click', () => {
  someFlagToSaveTheCurrentState = !someFlagToSaveTheCurrentState;
  loader.next(someFlagToSaveTheCurrentState);

  currentState.innerHTML = someFlagToSaveTheCurrentState;
})

let interval$ = interval(1500)

loader.pipe(
  buffer(interval$),
  scan((acc,e) => ([...acc.slice(1),e]), [[false], [false]]),
  filter(([lastEvents, newEvents]) => newEvents.length === 0 && lastEvents.length !== 0),
  map(([lastEvents, newEvents]) => {
  let result = false;
  if (newEvents.length === 0 && lastEvents.length !== 0) {
    result = lastEvents[lastEvents.length - 1]
  }
  return result;
}),
distinctUntilChanged()
  
).subscribe((loadingState) => {
  console.log(loadingState)
  if (loadingState) {
    loading.innerHTML = 'Element loading'
  } else {
    loading.innerHTML = 'Element is loaded'
  }
})
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.js"></script>

<h1 id="loading">Loading</h1>

<button id="loadingbtn">Toggle loading</button>
<h1 id="currentState">false</h1>

尝试运行代码段