RxJS 节流行为;立即获得第一个值
RxJS throttle behavior; get first value immediately
Plunkr 示例:https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview
我知道 RxJS (5.0 beta.4) 有以下 3 种限制方法:
auditTime()
、throttleTime()
和 debounceTime()
我正在寻找的行为是 lodash
默认情况下 throttle
:
-
- 马上给我第一个值!
-
- 在连续值上,保持给定延迟的值,然后发出最后出现的值
-
- 当油门延迟到期时,返回状态(1)
理论上这应该是这样的:
inputObservable
.do(() => cancelPreviousRequest())
.throttleTime(500)
.subscribe((value) => doNextRequest(value))
但是
throttleTime
永远不会给我最后一个值,如果在节流阀超时时发出
debounceTime
没有立即触发
auditTime
没有立即触发
我可以组合任何 RxJS 方法来实现所描述的行为吗?
对于较旧的 RxJ,我编写了一个 concatLatest
运算符来完成您想要的大部分操作。有了它,您可以通过以下代码获得节流行为:
const delay = Rx.Observable.empty().delay(500);
inputObservable
.map(value => Rx.Observable.of(value).concat(delay))
.concatLatest()
.subscribe(...);
接线员来了。我试着更新它以使用 RxJS5:
Rx.Observable.prototype.concatLatest = function () {
/// <summary>
/// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
/// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
/// away and only the Nth will be observed.
/// </summary>
/// <returns type="Rx.Observable"></returns>
var source = this;
return Rx.Observable.create(function (observer) {
var latest,
isStopped,
isBusy,
outerSubscription,
innerSubscription,
subscriptions = new Rx.Subscription(function () {
if (outerSubscription) {
outerSubscription.unsubscribe();
}
if (innerSubscription) {
innerSubscription.unsubscribe();
}
}),
onError = observer.error.bind(observer),
onNext = observer.next.bind(observer),
innerOnComplete = function () {
var inner = latest;
if (inner) {
latest = undefined;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
}
else {
isBusy = false;
if (isStopped) {
observer.complete();
}
}
};
outerSubscription = source.subscribe(function (newInner) {
if (isBusy) {
latest = newInner;
}
else {
isBusy = true;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
}
}, onError, function () {
isStopped = true;
if (!isBusy) {
observer.complete();
}
});
return subscriptions;
});
};
这是一个更新的 plunkr:https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview
请注意,我已将您的 lodash 版本更新为最新版本。在 lodash 4.7 中,我重写了 throttle/debounce 运算符以修复一些边缘情况错误。您使用的是 4.6.1,它仍然有一些错误,但我认为它们不会影响您的测试。
我采用 auditTime 运算符并更改了 2 行以实现所需的行为。
新 plunker:https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview
原文:
变化:
来自(审计时间):
protected _next(value: T): void {
this.value = value;
this.hasValue = true;
if (!this.throttled) {
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
}
}
clearThrottle(): void {
const { value, hasValue, throttled } = this;
if (throttled) {
this.remove(throttled);
this.throttled = null;
throttled.unsubscribe();
}
if (hasValue) {
this.value = null;
this.hasValue = false;
this.destination.next(value);
}
}
到(auditTimeImmediate):
protected _next(value: T): void {
this.value = value;
this.hasValue = true;
if (!this.throttled) {
// change 1:
this.clearThrottle();
}
}
clearThrottle(): void {
const { value, hasValue, throttled } = this;
if (throttled) {
this.remove(throttled);
this.throttled = null;
throttled.unsubscribe();
}
if (hasValue) {
this.value = null;
this.hasValue = false;
this.destination.next(value);
// change 2:
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
}
}
所以我在值 next
ed 之后开始超时。
用法:
inputObservable
.do(() => cancelPreviousRequest())
.auditTimeImmediate(500)
.subscribe((value) => doNextRequest(value))
对于 2018 年后寻找此内容的任何人:这是一年多前添加的,但由于某种原因,文档尚未更新。
您只需将配置对象传递给 throttleTime
。默认值为 { leading: true, trailing: false }
。要实现此处讨论的行为,您只需将 trailing
设置为 true
:{ leading: true, trailing: true }
编辑:
为了完整起见,这里有一个工作片段:
import { asyncScheduler } from 'rxjs'
import { throttleTime } from 'rxjs/operators'
...
observable.pipe(
throttleTime(100, asyncScheduler, { leading: true, trailing: true })
)
Plunkr 示例:https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview
我知道 RxJS (5.0 beta.4) 有以下 3 种限制方法:
auditTime()
、throttleTime()
和 debounceTime()
我正在寻找的行为是 lodash
默认情况下 throttle
:
-
- 马上给我第一个值!
-
- 在连续值上,保持给定延迟的值,然后发出最后出现的值
-
- 当油门延迟到期时,返回状态(1)
理论上这应该是这样的:
inputObservable
.do(() => cancelPreviousRequest())
.throttleTime(500)
.subscribe((value) => doNextRequest(value))
但是
throttleTime
永远不会给我最后一个值,如果在节流阀超时时发出debounceTime
没有立即触发auditTime
没有立即触发
我可以组合任何 RxJS 方法来实现所描述的行为吗?
对于较旧的 RxJ,我编写了一个 concatLatest
运算符来完成您想要的大部分操作。有了它,您可以通过以下代码获得节流行为:
const delay = Rx.Observable.empty().delay(500);
inputObservable
.map(value => Rx.Observable.of(value).concat(delay))
.concatLatest()
.subscribe(...);
接线员来了。我试着更新它以使用 RxJS5:
Rx.Observable.prototype.concatLatest = function () {
/// <summary>
/// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
/// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
/// away and only the Nth will be observed.
/// </summary>
/// <returns type="Rx.Observable"></returns>
var source = this;
return Rx.Observable.create(function (observer) {
var latest,
isStopped,
isBusy,
outerSubscription,
innerSubscription,
subscriptions = new Rx.Subscription(function () {
if (outerSubscription) {
outerSubscription.unsubscribe();
}
if (innerSubscription) {
innerSubscription.unsubscribe();
}
}),
onError = observer.error.bind(observer),
onNext = observer.next.bind(observer),
innerOnComplete = function () {
var inner = latest;
if (inner) {
latest = undefined;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
}
else {
isBusy = false;
if (isStopped) {
observer.complete();
}
}
};
outerSubscription = source.subscribe(function (newInner) {
if (isBusy) {
latest = newInner;
}
else {
isBusy = true;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
}
}, onError, function () {
isStopped = true;
if (!isBusy) {
observer.complete();
}
});
return subscriptions;
});
};
这是一个更新的 plunkr:https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview
请注意,我已将您的 lodash 版本更新为最新版本。在 lodash 4.7 中,我重写了 throttle/debounce 运算符以修复一些边缘情况错误。您使用的是 4.6.1,它仍然有一些错误,但我认为它们不会影响您的测试。
我采用 auditTime 运算符并更改了 2 行以实现所需的行为。
新 plunker:https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview
原文:
变化:
来自(审计时间):
protected _next(value: T): void {
this.value = value;
this.hasValue = true;
if (!this.throttled) {
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
}
}
clearThrottle(): void {
const { value, hasValue, throttled } = this;
if (throttled) {
this.remove(throttled);
this.throttled = null;
throttled.unsubscribe();
}
if (hasValue) {
this.value = null;
this.hasValue = false;
this.destination.next(value);
}
}
到(auditTimeImmediate):
protected _next(value: T): void {
this.value = value;
this.hasValue = true;
if (!this.throttled) {
// change 1:
this.clearThrottle();
}
}
clearThrottle(): void {
const { value, hasValue, throttled } = this;
if (throttled) {
this.remove(throttled);
this.throttled = null;
throttled.unsubscribe();
}
if (hasValue) {
this.value = null;
this.hasValue = false;
this.destination.next(value);
// change 2:
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
}
}
所以我在值 next
ed 之后开始超时。
用法:
inputObservable
.do(() => cancelPreviousRequest())
.auditTimeImmediate(500)
.subscribe((value) => doNextRequest(value))
对于 2018 年后寻找此内容的任何人:这是一年多前添加的,但由于某种原因,文档尚未更新。
您只需将配置对象传递给 throttleTime
。默认值为 { leading: true, trailing: false }
。要实现此处讨论的行为,您只需将 trailing
设置为 true
:{ leading: true, trailing: true }
编辑:
为了完整起见,这里有一个工作片段:
import { asyncScheduler } from 'rxjs'
import { throttleTime } from 'rxjs/operators'
...
observable.pipe(
throttleTime(100, asyncScheduler, { leading: true, trailing: true })
)