RxJS 节流行为;立即获得第一个值

RxJS throttle behavior; get first value immediately

Plunkr 示例:https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview

我知道 RxJS (5.0 beta.4) 有以下 3 种限制方法:

auditTime()throttleTime()debounceTime()

我正在寻找的行为是 lodash 默认情况下 throttle:

理论上这应该是这样的:

inputObservable
  .do(() => cancelPreviousRequest())
  .throttleTime(500)
  .subscribe((value) => doNextRequest(value))

但是

我可以组合任何 RxJS 方法来实现所描述的行为吗?

对于较旧的 RxJ,我编写了一个 concatLatest 运算符来完成您想要的大部分操作。有了它,您可以通过以下代码获得节流行为:

const delay = Rx.Observable.empty().delay(500);
inputObservable
    .map(value => Rx.Observable.of(value).concat(delay))
    .concatLatest()
    .subscribe(...);

接线员来了。我试着更新它以使用 RxJS5:

Rx.Observable.prototype.concatLatest = function () {
    /// <summary>
    /// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
    /// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
    /// away and only the Nth will be observed.
    /// </summary>
    /// <returns type="Rx.Observable"></returns>
    var source = this;

    return Rx.Observable.create(function (observer) {
        var latest,
            isStopped,
            isBusy,
            outerSubscription,
            innerSubscription,
            subscriptions = new Rx.Subscription(function () {
              if (outerSubscription) {
                outerSubscription.unsubscribe();
              }
              if (innerSubscription) {
                innerSubscription.unsubscribe();
              }
            }),
            onError = observer.error.bind(observer),
            onNext = observer.next.bind(observer),
            innerOnComplete = function () {
                var inner = latest;
                if (inner) {
                    latest = undefined;
                    if (innerSubscription) {
                      innerSubscription.unsubscribe();
                    }
                    innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
                }
                else {
                    isBusy = false;
                    if (isStopped) {
                        observer.complete();
                    }
                }
            };

        outerSubscription = source.subscribe(function (newInner) {
            if (isBusy) {
                latest = newInner;
            }
            else {
                isBusy = true;
                if (innerSubscription) {
                  innerSubscription.unsubscribe();
                }
                innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
            }
        }, onError, function () {
            isStopped = true;
            if (!isBusy) {
                observer.complete();
            }
        });

        return subscriptions;
    });
};

这是一个更新的 plunkr:https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview

请注意,我已将您的 lodash 版本更新为最新版本。在 lodash 4.7 中,我重写了 throttle/debounce 运算符以修复一些边缘情况错误。您使用的是 4.6.1,它仍然有一些错误,但我认为它们不会影响您的测试。

我采用 auditTime 运算符并更改了 2 行以实现所需的行为。

新 plunker:https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

原文:

变化:

来自(审计时间):

protected _next(value: T): void {
  this.value = value;
  this.hasValue = true;
  if (!this.throttled) {
    this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
  }
}

clearThrottle(): void {
  const { value, hasValue, throttled } = this;
  if (throttled) {
    this.remove(throttled);
    this.throttled = null;
    throttled.unsubscribe();
  }
  if (hasValue) {
    this.value = null;
    this.hasValue = false;
    this.destination.next(value);
  }
}

到(auditTimeImmediate):

protected _next(value: T): void {
    this.value = value;
    this.hasValue = true;
    if (!this.throttled) {
        // change 1:
        this.clearThrottle();
    }
}

clearThrottle(): void {
    const { value, hasValue, throttled } = this;
    if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
    }
    if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
        // change 2:
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
    }
}

所以我在值 nexted 之后开始超时。

用法:

inputObservable
  .do(() => cancelPreviousRequest())
  .auditTimeImmediate(500)
  .subscribe((value) => doNextRequest(value))

对于 2018 年后寻找此内容的任何人:这是一年多前添加的,但由于某种原因,文档尚未更新。

RxJS commit

您只需将配置对象传递给 throttleTime。默认值为 { leading: true, trailing: false }。要实现此处讨论的行为,您只需将 trailing 设置为 true{ leading: true, trailing: true }

编辑:

为了完整起见,这里有一个工作片段:

import { asyncScheduler } from 'rxjs'
import { throttleTime } from 'rxjs/operators'

...

observable.pipe(
  throttleTime(100, asyncScheduler, { leading: true, trailing: true })
)