如何去抖直到值改变(超时)?
How to debounce until value changed (with timeout)?
我需要一个 RX 操作来消除流中元素的抖动,直到值发生变化。如果值在一段时间内没有变化,它还必须支持发出最后一个元素的超时。
t标记超时
DistinctUntilChanged
有点相似,但我想要最后一个相等的项目,而不是第一个。我尝试使用 Buffer
和 GroupBy
并选择组中的最后一个,但我需要在每个元素上重置计时器以确保在选择最后一个之前该组包含所有相等的元素。
我做了一个使用 Timeout
和 Retry
的实现,但我对每次超时都必须重新订阅源不太满意,因为这可能不适合所有 scenarios/sources(即冷可观察量)。不过,我测试过的热观测值似乎工作正常。
public static IObservable<TSource> ThrottleBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout, IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer ??= EqualityComparer<TKey>.Default;
scheduler ??= DefaultScheduler.Instance;
var prev = default(TSource);
return source
.StartWith(default(TSource))
.Select(e =>
{
var ret = !comparer.Equals(keySelector(prev), keySelector(e)) ? prev : default;
prev = e;
return ret;
})
.Where(e => !Equals(e, default(TSource)))
.Timeout(timeout, scheduler)
.RetryWhen(ex => ex.OfType<TimeoutException>());
}
由于 Throttle 在 Rx.NET 中的工作方式,决定将其称为 ThrottleBy 而不是 DebounceBy。
关于如何实施这样的操作 could/should 有什么想法吗?
编辑:感谢弹珠图。我把它变成了一些测试用例。你是对的,我以前的解决方案缺少计时器。我在这里补充说,他将每条消息加倍,但一个立即通过,另一个延迟。解决方法如下:
public static IObservable<TSource> ThrottleBy4<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.Timestamp(scheduler)
.Publish(_val => Observable.Merge( // For every incoming item, create two items: One immediate, one delayed by the timeout time.
_val.Select(v => (value: v, isOriginal: true)),
_val.Select(v => (value: v, isOriginal: false)).Delay(timeout, scheduler)
))
.StateSelect(Timestamped.Create(default(TSource), DateTimeOffset.MinValue),
(prevVal, t) => // Result function
{
// special handling for the initial state
if (prevVal.Timestamp == DateTimeOffset.MinValue)
return (prevVal, false);
if (t.isOriginal) // If an original value, only emit if the value changed.
return (prevVal, !comparer.Equals(keySelector(t.value.Value), keySelector(prevVal.Value)));
else // If a repeat value, only emit if the prevVal state is the same timestamp and value.
return (prevVal, comparer.Equals(keySelector(t.value.Value), keySelector(prevVal.Value)) && t.value.Timestamp == prevVal.Timestamp);
},
(prevVal, t) => t.isOriginal ? t.value : prevVal // State function. Only change state if the incoming item is an original value.
)
.Where(t => t.Item2)
.Select(t => t.Item1.Value);
}
测试代码如下:
TestScheduler ts = new TestScheduler();
var source = ts.CreateHotObservable<string>(
new Recorded<Notification<string>>(200.MsTicks(), Notification.CreateOnNext("A1")),
new Recorded<Notification<string>>(300.MsTicks(), Notification.CreateOnNext("A2")),
new Recorded<Notification<string>>(500.MsTicks(), Notification.CreateOnNext("B1")),
new Recorded<Notification<string>>(800.MsTicks(), Notification.CreateOnNext("B2"))
);
var comparer = new FirstLetterComparer();
var target = source
.ThrottleBy4(s => s, TimeSpan.FromSeconds(1), comparer: comparer, scheduler: ts);
var expectedResults = ts.CreateHotObservable<string>(
new Recorded<Notification<string>>(500.MsTicks(), Notification.CreateOnNext("A2")),
new Recorded<Notification<string>>(1800.MsTicks(), Notification.CreateOnNext("B2"))
);
var observer = ts.CreateObserver<string>();
target.Subscribe(observer);
ts.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
和这些帮手 类:
public class FirstLetterComparer : IEqualityComparer<string>
{
public bool Equals(string s1, string s2)
{
if (s1 == null && s2 == null)
return true;
if (s1 == null || s2 == null)
return false;
return (s1[0] == s2[0]);
}
public int GetHashCode(string s)
{
return s == null ? 0 : s[0].GetHashCode();
}
}
public static class X
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
}
上一个回答:
我发现您的解决方案有两个可能的问题(除了 Timeout
问题):
- 使用
default(T)
作为 token-value 有时会让您感到困惑。例如,这将不允许 0
在 IObservable<int>
中通过。
- 由于您使用字段
prev
,您可能遇到多重订阅问题。多个订阅者将共享该字段,这可能会导致竞争条件和不正确的行为。
您可以通过返回一个元组来解决这两个问题,一个值包含是否为新的布尔值,另一个值包含以下值:
public static IObservable<TSource> ThrottleBy2<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.StateSelect(default(TSource), (prevVal, newVal) => (!comparer.Equals(keySelector(prevVal), keySelector(newVal)), newVal), (_, newVal) => newVal)
.Where(t => t.Item1)
.Select(t => t.newVal)
.Timeout(timeout, scheduler)
.RetryWhen(ex => ex.OfType<TimeoutException>());
}
StateSelect
在这里做你想做的事:它维护一个状态(你之前在 prev
字段中拥有的状态)和 returns 前面提到的元组。它看起来像这样:
public static IObservable<TResult> StateSelect<TSource, TState, TResult>(this IObservable<TSource> source, TState initialState,
Func<TState, TSource, TResult> resultSelector, Func<TState, TSource, TState> stateSelector)
{
return source
.StateSelectMany(initialState, (state, item) => Observable.Return(resultSelector(state, item)), stateSelector);
}
public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(this IObservable<TSource> source, TState initialState,
Func<TState, TSource, IObservable<TResult>> resultSelector, Func<TState, TSource, TState> stateSelector)
{
return source
.Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
.SelectMany(t => t.Item2);
}
这里还有两个小问题:
Timeout
问题
- 如果第一个真值为
default(TSource)
,则使用 default(TSource)
作为初始状态可能会导致问题。
我们可以通过引入时间戳来解决这两个问题:
public static IObservable<TSource> ThrottleBy3<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.Timestamp(scheduler)
.StateSelect(Timestamped.Create(default(TSource), DateTimeOffset.MinValue),
(prevVal, newVal) => (!comparer.Equals(keySelector(prevVal.Value), keySelector(newVal.Value)) || newVal.Timestamp - prevVal.Timestamp > timeout, newVal),
(prevVal, newVal) => !comparer.Equals(keySelector(prevVal.Value), keySelector(newVal.Value)) || newVal.Timestamp - prevVal.Timestamp > timeout ? newVal : prevVal
)
.Where(t => t.Item1)
.Select(t => t.newVal.Value);
}
这里我们将时间戳值存储为状态,如果去抖动时间足够或值发生变化,我们会更改状态。结果再次是一个元组,指示该值是否应该继续,以及带时间戳的值。
希望这对您有所帮助。
感谢Shlomo的大力帮助,我想我现在已经很好地解决了这个问题:
public static IObservable<TSource> ThrottleBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.Publish(_val => Observable.Merge(
_val.Select(v => (value: v, timeout: false)),
_val.Select(v => (value: v, timeout: true)).Throttle(timeout, scheduler)
))
.Scan((prev: (value: (object)null, timeout: false), emit: (object)null), (state, t) => {
if (state.prev.value == null) // Initial state
return (t, null); // Save new state and ignore
// Emit previous in case of timeout or value changed
if (t.timeout || (!state.prev.timeout && !comparer.Equals(keySelector(t.value), keySelector((TSource)state.prev.value))))
return (t, state.prev.value);
// Save new state and ignore
return (t, null);
})
.Where(x => x.emit != null)
.Select(x => (TSource)x.emit);
}
这很像 Shlomo 的建议,但我最终使用 Throttle
而不是 Delay
,这使它变得更加简单。合并到辅助方法中以使其自包含。我将值装箱以避免 default(TSource)
.
我需要一个 RX 操作来消除流中元素的抖动,直到值发生变化。如果值在一段时间内没有变化,它还必须支持发出最后一个元素的超时。
t标记超时
DistinctUntilChanged
有点相似,但我想要最后一个相等的项目,而不是第一个。我尝试使用 Buffer
和 GroupBy
并选择组中的最后一个,但我需要在每个元素上重置计时器以确保在选择最后一个之前该组包含所有相等的元素。
我做了一个使用 Timeout
和 Retry
的实现,但我对每次超时都必须重新订阅源不太满意,因为这可能不适合所有 scenarios/sources(即冷可观察量)。不过,我测试过的热观测值似乎工作正常。
public static IObservable<TSource> ThrottleBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout, IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer ??= EqualityComparer<TKey>.Default;
scheduler ??= DefaultScheduler.Instance;
var prev = default(TSource);
return source
.StartWith(default(TSource))
.Select(e =>
{
var ret = !comparer.Equals(keySelector(prev), keySelector(e)) ? prev : default;
prev = e;
return ret;
})
.Where(e => !Equals(e, default(TSource)))
.Timeout(timeout, scheduler)
.RetryWhen(ex => ex.OfType<TimeoutException>());
}
由于 Throttle 在 Rx.NET 中的工作方式,决定将其称为 ThrottleBy 而不是 DebounceBy。
关于如何实施这样的操作 could/should 有什么想法吗?
编辑:感谢弹珠图。我把它变成了一些测试用例。你是对的,我以前的解决方案缺少计时器。我在这里补充说,他将每条消息加倍,但一个立即通过,另一个延迟。解决方法如下:
public static IObservable<TSource> ThrottleBy4<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.Timestamp(scheduler)
.Publish(_val => Observable.Merge( // For every incoming item, create two items: One immediate, one delayed by the timeout time.
_val.Select(v => (value: v, isOriginal: true)),
_val.Select(v => (value: v, isOriginal: false)).Delay(timeout, scheduler)
))
.StateSelect(Timestamped.Create(default(TSource), DateTimeOffset.MinValue),
(prevVal, t) => // Result function
{
// special handling for the initial state
if (prevVal.Timestamp == DateTimeOffset.MinValue)
return (prevVal, false);
if (t.isOriginal) // If an original value, only emit if the value changed.
return (prevVal, !comparer.Equals(keySelector(t.value.Value), keySelector(prevVal.Value)));
else // If a repeat value, only emit if the prevVal state is the same timestamp and value.
return (prevVal, comparer.Equals(keySelector(t.value.Value), keySelector(prevVal.Value)) && t.value.Timestamp == prevVal.Timestamp);
},
(prevVal, t) => t.isOriginal ? t.value : prevVal // State function. Only change state if the incoming item is an original value.
)
.Where(t => t.Item2)
.Select(t => t.Item1.Value);
}
测试代码如下:
TestScheduler ts = new TestScheduler();
var source = ts.CreateHotObservable<string>(
new Recorded<Notification<string>>(200.MsTicks(), Notification.CreateOnNext("A1")),
new Recorded<Notification<string>>(300.MsTicks(), Notification.CreateOnNext("A2")),
new Recorded<Notification<string>>(500.MsTicks(), Notification.CreateOnNext("B1")),
new Recorded<Notification<string>>(800.MsTicks(), Notification.CreateOnNext("B2"))
);
var comparer = new FirstLetterComparer();
var target = source
.ThrottleBy4(s => s, TimeSpan.FromSeconds(1), comparer: comparer, scheduler: ts);
var expectedResults = ts.CreateHotObservable<string>(
new Recorded<Notification<string>>(500.MsTicks(), Notification.CreateOnNext("A2")),
new Recorded<Notification<string>>(1800.MsTicks(), Notification.CreateOnNext("B2"))
);
var observer = ts.CreateObserver<string>();
target.Subscribe(observer);
ts.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
和这些帮手 类:
public class FirstLetterComparer : IEqualityComparer<string>
{
public bool Equals(string s1, string s2)
{
if (s1 == null && s2 == null)
return true;
if (s1 == null || s2 == null)
return false;
return (s1[0] == s2[0]);
}
public int GetHashCode(string s)
{
return s == null ? 0 : s[0].GetHashCode();
}
}
public static class X
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
}
上一个回答:
我发现您的解决方案有两个可能的问题(除了 Timeout
问题):
- 使用
default(T)
作为 token-value 有时会让您感到困惑。例如,这将不允许0
在IObservable<int>
中通过。 - 由于您使用字段
prev
,您可能遇到多重订阅问题。多个订阅者将共享该字段,这可能会导致竞争条件和不正确的行为。
您可以通过返回一个元组来解决这两个问题,一个值包含是否为新的布尔值,另一个值包含以下值:
public static IObservable<TSource> ThrottleBy2<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.StateSelect(default(TSource), (prevVal, newVal) => (!comparer.Equals(keySelector(prevVal), keySelector(newVal)), newVal), (_, newVal) => newVal)
.Where(t => t.Item1)
.Select(t => t.newVal)
.Timeout(timeout, scheduler)
.RetryWhen(ex => ex.OfType<TimeoutException>());
}
StateSelect
在这里做你想做的事:它维护一个状态(你之前在 prev
字段中拥有的状态)和 returns 前面提到的元组。它看起来像这样:
public static IObservable<TResult> StateSelect<TSource, TState, TResult>(this IObservable<TSource> source, TState initialState,
Func<TState, TSource, TResult> resultSelector, Func<TState, TSource, TState> stateSelector)
{
return source
.StateSelectMany(initialState, (state, item) => Observable.Return(resultSelector(state, item)), stateSelector);
}
public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(this IObservable<TSource> source, TState initialState,
Func<TState, TSource, IObservable<TResult>> resultSelector, Func<TState, TSource, TState> stateSelector)
{
return source
.Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
.SelectMany(t => t.Item2);
}
这里还有两个小问题:
Timeout
问题- 如果第一个真值为
default(TSource)
,则使用default(TSource)
作为初始状态可能会导致问题。
我们可以通过引入时间戳来解决这两个问题:
public static IObservable<TSource> ThrottleBy3<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.Timestamp(scheduler)
.StateSelect(Timestamped.Create(default(TSource), DateTimeOffset.MinValue),
(prevVal, newVal) => (!comparer.Equals(keySelector(prevVal.Value), keySelector(newVal.Value)) || newVal.Timestamp - prevVal.Timestamp > timeout, newVal),
(prevVal, newVal) => !comparer.Equals(keySelector(prevVal.Value), keySelector(newVal.Value)) || newVal.Timestamp - prevVal.Timestamp > timeout ? newVal : prevVal
)
.Where(t => t.Item1)
.Select(t => t.newVal.Value);
}
这里我们将时间戳值存储为状态,如果去抖动时间足够或值发生变化,我们会更改状态。结果再次是一个元组,指示该值是否应该继续,以及带时间戳的值。
希望这对您有所帮助。
感谢Shlomo的大力帮助,我想我现在已经很好地解决了这个问题:
public static IObservable<TSource> ThrottleBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.Publish(_val => Observable.Merge(
_val.Select(v => (value: v, timeout: false)),
_val.Select(v => (value: v, timeout: true)).Throttle(timeout, scheduler)
))
.Scan((prev: (value: (object)null, timeout: false), emit: (object)null), (state, t) => {
if (state.prev.value == null) // Initial state
return (t, null); // Save new state and ignore
// Emit previous in case of timeout or value changed
if (t.timeout || (!state.prev.timeout && !comparer.Equals(keySelector(t.value), keySelector((TSource)state.prev.value))))
return (t, state.prev.value);
// Save new state and ignore
return (t, null);
})
.Where(x => x.emit != null)
.Select(x => (TSource)x.emit);
}
这很像 Shlomo 的建议,但我最终使用 Throttle
而不是 Delay
,这使它变得更加简单。合并到辅助方法中以使其自包含。我将值装箱以避免 default(TSource)
.