Rx.NET 取消限制

Rx.NET throttling with cancellation

我有一个类似于 this one 的问题。我需要按以下方式处理一系列用户输入事件(搜索):

除了取消之外,我的代码似乎几乎是这样工作的。

Observable.FromEventPattern<TextChangedEventArgs>(
    handler => SearchBox.TextChanged += handler,
    handler => SearchBox.TextChanged -= handler)
.ObserveOn(SynchronizationContext.Current)
.Select(GetSearchQuery)
.Throttle(TimeSpan.FromMilliseconds(MinimumSearchIntervalMiliseconds))
.DistinctUntilChanged()
.Subscribe(ExecuteSearch, () => { });

string GetSearchQuery(EventPattern<TextChangedEventArgs>) returns 搜索字符串和 void ExecuteSearch(string) 运行 搜索。

出于某种原因,我找不到 SO 上所有答案中提到的 Switch() 扩展...

我在 4.0.0

版本中使用 System.ReactiveSystem.Reactive.Linq

我猜想这种形式的Select()Subscribe()并不是上面代码中的最佳解决方案。他们可能应该在 Tasks...

上操作

知道如何改进上面的管道以支持按需取消吗?

我在这里猜测,但您可能 ExecuteSearch 发送搜索字符串,获取结果,然后将它们绑定到 UI。理想情况下,将 ExecuteSearch 拆分为 return IObservable<Results>Task<Results> 和一个新函数 public void ApplySearchResults(Results r) 来处理 UI 绑定。

一旦你有了它,这应该可以工作:

Observable.FromEventPattern<TextChangedEventArgs>(
    handler => SearchBox.TextChanged += handler,
    handler => SearchBox.TextChanged -= handler)
.ObserveOn(SynchronizationContext.Current)
.Select(GetSearchQuery)
.Throttle(TimeSpan.FromMilliseconds(MinimumSearchIntervalMiliseconds))
.DistinctUntilChanged()
.Select(ExecuteSearch /* .ToObservable() if it returns Task<Results>/*)
.Switch()
.Subscribe(ApplyResults, () => { })

.Switch 适用于 IObservable<IObservable<T>>。你没有双重可观察性,只有一个,这就是你没有看到它的原因。