带有参数 Func<TSource, IObservable<TThrottle>> throttleDurationSelector 的节流目的

Purpose of throttle with parameter Func<TSource, IObservable<TThrottle>> throttleDurationSelector

我找到的所有关于 rx.net 节流的文档都没有涵盖带有参数 Func<TSource, IObservable<TThrottle>> throttleDurationSelector 的过载。所以我只有 XML-评论。这表明,对源序列中的每个新元素调用 throttleDurationSelector。预期的 return 值将是 IObservable<TThrottle>。 (我的理解)这使得改变每个新元素的油门延迟成为可能。但是,这种理解与我发现的运行时体验不符。

var s = new Subject<string>();
s.Throttle(_ => Observable.Return(TimeSpan.FromMilliseconds(500))) // for simplicity of this demo, always return the same delay
    .Subscribe(_ => Console.WriteLine($"{DateTime.Now}.{DateTime.Now.Millisecond} event {_}"));

for (int i = 0; i < 5; i++)
    s.OnNext("a");

Thread.Sleep(1000);

for (int i = 0; i < 5; i++)
    s.OnNext("b");

Thread.Sleep(1000);

根据前面提到的理解(这显然是错误的),我本以为会有以下输出。

10.02.2022 11:47:54.386 event a
10.02.2022 11:47:55.388 event b

而是生成此输出。似乎根本没有应用节流。

10.02.2022 11:46:49.431 event a
10.02.2022 11:46:49.432 event a
10.02.2022 11:46:49.432 event a
10.02.2022 11:46:49.432 event a
10.02.2022 11:46:49.432 event a
10.02.2022 11:46:50.448 event b
10.02.2022 11:46:50.448 event b
10.02.2022 11:46:50.448 event b
10.02.2022 11:46:50.448 event b
10.02.2022 11:46:50.449 event b

如果不是动态改变油门延迟(这是我需要的),这个过载的目的是什么?

此外,为什么 Func<TSource, IObservable<TThrottle>> throttleDurationSelector 的 'complicated' 语法更简单的参数 Func<TSource, TThrottle> throttleDurationSelector 就足够了?

我们正在谈论 Throttle 运算符的重载:

public static IObservable<TSource> Throttle<TSource, TThrottle>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TThrottle>> throttleDurationSelector);

throttleDurationSelector 参数的目的是为您提供最大的灵活性来定义每个元素的限制持续时间。 TThrottle 的值并不重要。重要的是它何时发出一个元素(任何元素)或何时完成。这就是持续时间的定义方式,通过动态观察 TThrottle 序列,并等待它发出任何类型的通知。

回到您的代码,您将返回 Observable.Return 作为节流序列。此序列立即发出 TimeSpan 元素,导致源 s 序列的关联 string 元素也立即发出。结果是没有发生节流。它等效于使用 TimeSpan.Zero 参数进行限制,或者完全删除 Throttle 运算符。

要修复您的代码,您必须将 Observable.Return 替换为一个延迟发射元素的序列,例如 Observable.Timer 序列。因此,如果您在收到 TSource 元素后立即知道节流持续时间,并且为简单起见,它始终为 500 毫秒,您可以将其替换为:

s.Throttle(_ => Observable.Return(TimeSpan.FromMilliseconds(500)))

...有了这个:

s.Throttle(_ => Observable.Timer(TimeSpan.FromMilliseconds(500)))