去抖直到 Rx.Net
Debounce Until in Rx.Net
我有 2 个事件流:
1.鼠标拖放事件流(拖开始...拖结束...拖开始...拖结束)
2.按键事件流('a' ... 'b' .... 'c' .... 'd')
我需要合并成一个只包含来自第二个流的事件的流(所以只有按键),但它需要过滤掉在拖动开始和拖动结束之间发生的所有按键,除了最后一个.
所以如果来源是这样的:
... Start ............... End .............. Start .............. End
和
...........'a'...'b'...'c'.......'d'...'e'..........'f'....'g'.......
结果应该是这样的:
...........................'c'...'d'...'e'..........................'g'
在 C# 中使用 Rx.net 可以实现类似的功能吗?
答案是肯定的。先回答再解释:
public static class X
{
public static IObservable<T> GatedDebounce<T>(this IObservable<T> source, IObservable<bool> gating)
{
var finalStream = gating
.StartWith(false)
.DistinctUntilChanged()
.Publish(_gating => source.Publish(_source => Observable.Merge(
_source
.Window(_gating.Where(b => b), _ => _gating.Where(b => !b))
.SelectMany(o => o.LastAsync()),
_source
.Window(_gating.Where(b => !b), _ => _gating.Where(b => b))
.Merge()
)));
return finalStream;
}
}
然后,给定一个 IObservable<T>
表示您的值,以及一个 IObservable<bool>
表示拖动开始和停止的位置(true 表示拖动开始,false 表示拖动结束),您可以调用它像这样:
var throttledStream= valueStream.GatedDebounce(gateStream);
解释:
为了更好地理解它,让我们抛出 Publish
调用,并将其分解为多个部分:
片段 1,
source
.Window(gating.Where(b => b), _ => gating.Where(b => !b))
.SelectMany(o => o.LastAsync())
这个 Window
函数意味着调用意味着我们在 gating 发出 true 时启动一个子集 observable(或 window),并在 gating 发出 false 时结束 window。从那个 window,我们 select 最后一个项目,如果它存在的话。这只会在 window 关闭时发出。
片段 2,
source
.Window(gating.Where(b => !b), _ => gating.Where(b => b))
.Merge() //Equivalent to .SelectMany(o => o) if you prefer
这个 Window
函数做相反的事情:每当 gating 发出 false 时启动 window,并在 gating 发出 true 时结束它。从那个 window 我们在它到达时发出一切。
将这两个与 Merge
放在一起,您就获得了 90% 的解决方案。其余:
.StartWith(false)
是为了确保我们在您最初启动 observable 时打开一个 window,否则在第一个门控项之前发生的值将丢失。
DistintUntilChanged()
是确保我们的门是 t、f、t、f 并且绝不会连续出现两个相同值的廉价方法,这会导致两个同时的 windows打开。
Publish
调用是防止多次订阅的好习惯。您可以在此处的其他一些问答中找到更好的解释。
我有 2 个事件流: 1.鼠标拖放事件流(拖开始...拖结束...拖开始...拖结束) 2.按键事件流('a' ... 'b' .... 'c' .... 'd')
我需要合并成一个只包含来自第二个流的事件的流(所以只有按键),但它需要过滤掉在拖动开始和拖动结束之间发生的所有按键,除了最后一个.
所以如果来源是这样的:
... Start ............... End .............. Start .............. End
和
...........'a'...'b'...'c'.......'d'...'e'..........'f'....'g'.......
结果应该是这样的:
...........................'c'...'d'...'e'..........................'g'
在 C# 中使用 Rx.net 可以实现类似的功能吗?
答案是肯定的。先回答再解释:
public static class X
{
public static IObservable<T> GatedDebounce<T>(this IObservable<T> source, IObservable<bool> gating)
{
var finalStream = gating
.StartWith(false)
.DistinctUntilChanged()
.Publish(_gating => source.Publish(_source => Observable.Merge(
_source
.Window(_gating.Where(b => b), _ => _gating.Where(b => !b))
.SelectMany(o => o.LastAsync()),
_source
.Window(_gating.Where(b => !b), _ => _gating.Where(b => b))
.Merge()
)));
return finalStream;
}
}
然后,给定一个 IObservable<T>
表示您的值,以及一个 IObservable<bool>
表示拖动开始和停止的位置(true 表示拖动开始,false 表示拖动结束),您可以调用它像这样:
var throttledStream= valueStream.GatedDebounce(gateStream);
解释:
为了更好地理解它,让我们抛出 Publish
调用,并将其分解为多个部分:
片段 1,
source
.Window(gating.Where(b => b), _ => gating.Where(b => !b))
.SelectMany(o => o.LastAsync())
这个 Window
函数意味着调用意味着我们在 gating 发出 true 时启动一个子集 observable(或 window),并在 gating 发出 false 时结束 window。从那个 window,我们 select 最后一个项目,如果它存在的话。这只会在 window 关闭时发出。
片段 2,
source
.Window(gating.Where(b => !b), _ => gating.Where(b => b))
.Merge() //Equivalent to .SelectMany(o => o) if you prefer
这个 Window
函数做相反的事情:每当 gating 发出 false 时启动 window,并在 gating 发出 true 时结束它。从那个 window 我们在它到达时发出一切。
将这两个与 Merge
放在一起,您就获得了 90% 的解决方案。其余:
.StartWith(false)
是为了确保我们在您最初启动 observable 时打开一个 window,否则在第一个门控项之前发生的值将丢失。DistintUntilChanged()
是确保我们的门是 t、f、t、f 并且绝不会连续出现两个相同值的廉价方法,这会导致两个同时的 windows打开。Publish
调用是防止多次订阅的好习惯。您可以在此处的其他一些问答中找到更好的解释。