RxJS:只有在不同的情况下才去抖流
RxJS: debounce a stream only if distinct
我想对流进行去抖动 - 但前提是源值与以前相同。我将如何使用 RxJS 5 执行此操作?
如果值相同并且我之前在指定时间内发出过,我不想发出一个值 window。我应该能够使用流中的值 - 或者比较类似于 distinctUntilChanged 的函数。
如果不创建您自己的运算符,我不知道有什么方法可以做到这一点,因为您需要维护某种状态(最后看到的值)。
一种方式看起来像这样:
// I named this debounceDistinctUntilChanged but that might not be
// the best name. Name it whatever you think makes sense!
function debounceDistinctUntilChanged(delay) {
const source$ = this;
return new Observable(observer => {
// Using an object as the default value
// so that the first time we check it
// if its the same its guaranteed to be false
// because every object has a different identity.
// Can't use null or undefined because source may
// emit these!
let lastSeen = {};
return source$
.debounce(value => {
// If the last value has the same identity we'll
// actually debounce
if (value === lastSeen) {
return Observable.timer(delay);
} else {
lastSeen = value;
// This will complete() right away so we don't actually debounce/buffer
// it at all
return Observable.empty();
}
})
.subscribe(observer);
});
}
现在您看到了一个实现,您可能(或可能不会)发现它与您的期望不同。您的描述实际上遗漏了某些细节,例如它是否应该只是您在去抖动时间范围内保留的 last 值,或者它是否是一个集合——基本上是 distinctUntilChanged
与 distinct
。我假设后者。
无论哪种方式,希望这能为您提供一个起点,并揭示创建自定义运算符是多么容易。内置运算符肯定不会按原样为所有内容提供解决方案,因此任何足够先进的应用程序都需要自己制作(或者在不抽象的情况下内联执行命令式操作,这也很好)。
然后您可以通过将其放在 Observable 原型上来使用此运算符:
Observable.prototype.debounceDistinctUntilChanged = debounceDistinctUntilChanged;
// later
source$
.debounceDistinctUntilChanged(400)
.subscribe(d => console.log(d));
或使用 let
:
// later
source$
.let(source$ => debounceDistinctUntilChanged.call($source, 400))
.subscribe(d => console.log(d));
如果可以的话,我建议您真正理解我的代码的作用,这样您以后就可以轻松地制定自己的解决方案。
这取决于你想做什么;当我尝试做类似的事情时,我遇到了这个问题,基本上是去抖动但对对象的不同值使用不同的去抖动。
在尝试了 jayphelps 的解决方案后,我无法让它按照我想要的方式运行。经过多次来回,原来有一个内置的简单方法可以做到这一点:groupby。
const priceUpdates = [
{bid: 10, id: 25},
{bid: 20, id: 30},
{bid: 11, id: 25},
{bid: 21, id: 30},
{bid: 25, id: 30}
];//emit each person
const source = Rx.Observable.from(priceUpdates);
//group by age
const example = source
.groupBy(bid => bid.id)
.mergeMap(group$ => group$.debounceTime(500))
const subscribe = example.subscribe(val => console.log(val));
输出:
[object Object] {
bid: 11,
id: 25
}
[object Object] {
bid: 25,
id: 30
}
Jsbin: http://jsbin.com/savahivege/edit?js,console
此代码将按出价 ID 分组并对其进行去抖动,因此仅发送每个的最后一个值。
rxjs 6 更新:
source$
.pipe(
// debounceTime(300), optionally un-comment this to add debounce
distinctUntilChanged(),
)
.subscribe(v => console.log(v))
这个 rxjs6+ 运算符将在源 'value' 发生变化时或自上次发出后已经过去 'delay' 时间后发出(即使 'value' 没有变化):
export function throttleUntilChanged(delay: number) {
return (source: Observable<any>) => {
return new Observable(observer => {
let lastSeen = {};
let lastSeenTime = 0;
return source
.pipe(
flatMap((value: any) => {
const now = Date.now();
if (value === lastSeen && (now - lastSeenTime) < delay ) {
return empty();
} else {
lastSeen = value;
lastSeenTime = now;
return of(value);
}
})
)
.subscribe(observer);
});
};
}
使用@samberic 建议的方法为 RxJS 6+ 提供答案 ngrx
对来自与 RxJS 6 相同源 ID 的操作进行分组。
this.actions$.pipe(
ofType(actionFoo, actionBar), // Two different ngrx action with an id property
groupBy(action => action.id), // Group by the id from the source
mergeMap(action => action.pipe(
debounceTime(5000)
))
).pipe(
// Do whatever it is that your effect is supposed to do!
)
这是我的 RXJS 6+ 打字稿版本,可以按最初要求 100% 工作。对每个新源值进行去抖动(重启计时器)。仅当新值与先前值不同或去抖时间已过时才发出值。
// custom rxjs operator to debounce while the source emits the same values
debounceDistinct<T>(delay: number) {
return (source: Observable<T>): Observable<T> => {
return new Observable(subscriber => {
let hasValue = false;
let lastValue: T | null = null;
let durationSub: Subscription = null;
const emit = () => {
durationSub?.unsubscribe();
durationSub = null;
if (hasValue) {
// We have a value! Free up memory first, then emit the value.
hasValue = false;
const value = lastValue!;
lastValue = null;
subscriber.next(value);
}
};
return source.subscribe(
(value: T) => {
// new value received cancel timer
durationSub?.unsubscribe();
// emit lastValue if the value has changed
if (hasValue && value !== lastValue) {
const value = lastValue!;
subscriber.next(value);
}
hasValue = true;
lastValue = value;
// restart timer
durationSub = timer(delay).subscribe(() => {
emit();
});
},
error => {
},
() => {
emit();
subscriber.complete();
lastValue = null;
});
});
}
}
另一种可能性,但不确定 rxjs5 是否支持:
source$.pipe(
pairwise(),
debounce(([a, b]) => {
if (a === b) {
return interval(1000)
}
return of();
}),
map(([a,b]) => b)
)
.subscribe(console.log);
https://stackblitz.com/edit/typescript-39nq7f?file=index.ts&devtoolsheight=50
我想对流进行去抖动 - 但前提是源值与以前相同。我将如何使用 RxJS 5 执行此操作?
如果值相同并且我之前在指定时间内发出过,我不想发出一个值 window。我应该能够使用流中的值 - 或者比较类似于 distinctUntilChanged 的函数。
如果不创建您自己的运算符,我不知道有什么方法可以做到这一点,因为您需要维护某种状态(最后看到的值)。
一种方式看起来像这样:
// I named this debounceDistinctUntilChanged but that might not be
// the best name. Name it whatever you think makes sense!
function debounceDistinctUntilChanged(delay) {
const source$ = this;
return new Observable(observer => {
// Using an object as the default value
// so that the first time we check it
// if its the same its guaranteed to be false
// because every object has a different identity.
// Can't use null or undefined because source may
// emit these!
let lastSeen = {};
return source$
.debounce(value => {
// If the last value has the same identity we'll
// actually debounce
if (value === lastSeen) {
return Observable.timer(delay);
} else {
lastSeen = value;
// This will complete() right away so we don't actually debounce/buffer
// it at all
return Observable.empty();
}
})
.subscribe(observer);
});
}
现在您看到了一个实现,您可能(或可能不会)发现它与您的期望不同。您的描述实际上遗漏了某些细节,例如它是否应该只是您在去抖动时间范围内保留的 last 值,或者它是否是一个集合——基本上是 distinctUntilChanged
与 distinct
。我假设后者。
无论哪种方式,希望这能为您提供一个起点,并揭示创建自定义运算符是多么容易。内置运算符肯定不会按原样为所有内容提供解决方案,因此任何足够先进的应用程序都需要自己制作(或者在不抽象的情况下内联执行命令式操作,这也很好)。
然后您可以通过将其放在 Observable 原型上来使用此运算符:
Observable.prototype.debounceDistinctUntilChanged = debounceDistinctUntilChanged;
// later
source$
.debounceDistinctUntilChanged(400)
.subscribe(d => console.log(d));
或使用 let
:
// later
source$
.let(source$ => debounceDistinctUntilChanged.call($source, 400))
.subscribe(d => console.log(d));
如果可以的话,我建议您真正理解我的代码的作用,这样您以后就可以轻松地制定自己的解决方案。
这取决于你想做什么;当我尝试做类似的事情时,我遇到了这个问题,基本上是去抖动但对对象的不同值使用不同的去抖动。
在尝试了 jayphelps 的解决方案后,我无法让它按照我想要的方式运行。经过多次来回,原来有一个内置的简单方法可以做到这一点:groupby。
const priceUpdates = [
{bid: 10, id: 25},
{bid: 20, id: 30},
{bid: 11, id: 25},
{bid: 21, id: 30},
{bid: 25, id: 30}
];//emit each person
const source = Rx.Observable.from(priceUpdates);
//group by age
const example = source
.groupBy(bid => bid.id)
.mergeMap(group$ => group$.debounceTime(500))
const subscribe = example.subscribe(val => console.log(val));
输出:
[object Object] {
bid: 11,
id: 25
}
[object Object] {
bid: 25,
id: 30
}
Jsbin: http://jsbin.com/savahivege/edit?js,console
此代码将按出价 ID 分组并对其进行去抖动,因此仅发送每个的最后一个值。
rxjs 6 更新:
source$
.pipe(
// debounceTime(300), optionally un-comment this to add debounce
distinctUntilChanged(),
)
.subscribe(v => console.log(v))
这个 rxjs6+ 运算符将在源 'value' 发生变化时或自上次发出后已经过去 'delay' 时间后发出(即使 'value' 没有变化):
export function throttleUntilChanged(delay: number) {
return (source: Observable<any>) => {
return new Observable(observer => {
let lastSeen = {};
let lastSeenTime = 0;
return source
.pipe(
flatMap((value: any) => {
const now = Date.now();
if (value === lastSeen && (now - lastSeenTime) < delay ) {
return empty();
} else {
lastSeen = value;
lastSeenTime = now;
return of(value);
}
})
)
.subscribe(observer);
});
};
}
使用@samberic 建议的方法为 RxJS 6+ 提供答案 ngrx
对来自与 RxJS 6 相同源 ID 的操作进行分组。
this.actions$.pipe(
ofType(actionFoo, actionBar), // Two different ngrx action with an id property
groupBy(action => action.id), // Group by the id from the source
mergeMap(action => action.pipe(
debounceTime(5000)
))
).pipe(
// Do whatever it is that your effect is supposed to do!
)
这是我的 RXJS 6+ 打字稿版本,可以按最初要求 100% 工作。对每个新源值进行去抖动(重启计时器)。仅当新值与先前值不同或去抖时间已过时才发出值。
// custom rxjs operator to debounce while the source emits the same values
debounceDistinct<T>(delay: number) {
return (source: Observable<T>): Observable<T> => {
return new Observable(subscriber => {
let hasValue = false;
let lastValue: T | null = null;
let durationSub: Subscription = null;
const emit = () => {
durationSub?.unsubscribe();
durationSub = null;
if (hasValue) {
// We have a value! Free up memory first, then emit the value.
hasValue = false;
const value = lastValue!;
lastValue = null;
subscriber.next(value);
}
};
return source.subscribe(
(value: T) => {
// new value received cancel timer
durationSub?.unsubscribe();
// emit lastValue if the value has changed
if (hasValue && value !== lastValue) {
const value = lastValue!;
subscriber.next(value);
}
hasValue = true;
lastValue = value;
// restart timer
durationSub = timer(delay).subscribe(() => {
emit();
});
},
error => {
},
() => {
emit();
subscriber.complete();
lastValue = null;
});
});
}
}
另一种可能性,但不确定 rxjs5 是否支持:
source$.pipe(
pairwise(),
debounce(([a, b]) => {
if (a === b) {
return interval(1000)
}
return of();
}),
map(([a,b]) => b)
)
.subscribe(console.log);
https://stackblitz.com/edit/typescript-39nq7f?file=index.ts&devtoolsheight=50