没有对订阅者的空调用的 Rx 缓冲区
Rx Buffer without empty calls to subscriber
在我使用 .Net 4.6 的 WPF 应用程序中,我有一个事件以高速率(每秒数百个)触发新数据点,但并非始终如此。此数据显示在图表中。
我想每 50 毫秒更新一次图表,而不是在每个新数据点之后。
为了实现这一点,我虽然使用了 Rx 中的 Buffer(TimeSpan.FromMilliseconds(50))
,理论上它工作正常。但是如果没有创建新的数据点,我的订阅者也会每 50 毫秒被调用一次,这不是我想要的。
我创建了一个小示例应用程序来测试它:
using System;
using System.Reactive.Linq;
namespace RxTester
{
public class Program
{
private static event EventHandler TheEvent;
static void Main(string[] args)
{
var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h);
var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000))
.Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received..."));
var random = new Random();
var timer = new System.Timers.Timer(2000)
{
AutoReset = true,
Enabled = true
};
timer.Elapsed += (s, e) =>
{
var amount = random.Next(1, 10);
for (int i = 0; i < amount; ++i)
TheEvent?.Invoke(null, null);
};
Console.ReadLine();
timer.Enabled = false;
subscriber.Dispose();
}
}
}
您需要将 "Rx-Linq" NuGet 包添加到 运行 或使用以下 Fiddle: https://dotnetfiddle.net/TV5tD4
您会看到几个“收到 0 个元素”,这是我想避免的。我知道我可以简单地检查 e.Count == 0
,但是当我使用多个这样的缓冲区时,这对我来说似乎不是最佳选择。
有没有办法只在元素可用时创建新的缓冲元素块?
我也愿意使用其他方法来解决我的按时间批处理事件的问题——我已经研究过 TPL 数据流 BatchBlock
,但它似乎只支持基于计数的块大小。
我们可以再次使用强大的GroupByUntil
方法来创建这个扩展
public static IObservable<IList<TSource>> BufferWhenAvailable<TSource>
(this IObservable<TSource> source,
TimeSpan threshold)
{
return source.Publish( sp =>
sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold))
.SelectMany(i => i.ToList()));
}
执行此操作的标准方法很简单
.Buffer(period)
.Where(buffer=>buffer.Any())
如此有效地做您想避免的事 (count==0)
。然而,这张支票非常便宜,我想如果比其他涉及的成本便宜得多,即调度。唯一的问题可能是正在发生的分配量(每 50 毫秒创建一个 List<T>
),然后是即将到来的 GC Gen0 压力可能会增加。
在我使用 .Net 4.6 的 WPF 应用程序中,我有一个事件以高速率(每秒数百个)触发新数据点,但并非始终如此。此数据显示在图表中。
我想每 50 毫秒更新一次图表,而不是在每个新数据点之后。
为了实现这一点,我虽然使用了 Rx 中的 Buffer(TimeSpan.FromMilliseconds(50))
,理论上它工作正常。但是如果没有创建新的数据点,我的订阅者也会每 50 毫秒被调用一次,这不是我想要的。
我创建了一个小示例应用程序来测试它:
using System;
using System.Reactive.Linq;
namespace RxTester
{
public class Program
{
private static event EventHandler TheEvent;
static void Main(string[] args)
{
var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h);
var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000))
.Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received..."));
var random = new Random();
var timer = new System.Timers.Timer(2000)
{
AutoReset = true,
Enabled = true
};
timer.Elapsed += (s, e) =>
{
var amount = random.Next(1, 10);
for (int i = 0; i < amount; ++i)
TheEvent?.Invoke(null, null);
};
Console.ReadLine();
timer.Enabled = false;
subscriber.Dispose();
}
}
}
您需要将 "Rx-Linq" NuGet 包添加到 运行 或使用以下 Fiddle: https://dotnetfiddle.net/TV5tD4
您会看到几个“收到 0 个元素”,这是我想避免的。我知道我可以简单地检查 e.Count == 0
,但是当我使用多个这样的缓冲区时,这对我来说似乎不是最佳选择。
有没有办法只在元素可用时创建新的缓冲元素块?
我也愿意使用其他方法来解决我的按时间批处理事件的问题——我已经研究过 TPL 数据流 BatchBlock
,但它似乎只支持基于计数的块大小。
我们可以再次使用强大的GroupByUntil
方法来创建这个扩展
public static IObservable<IList<TSource>> BufferWhenAvailable<TSource>
(this IObservable<TSource> source,
TimeSpan threshold)
{
return source.Publish( sp =>
sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold))
.SelectMany(i => i.ToList()));
}
执行此操作的标准方法很简单
.Buffer(period)
.Where(buffer=>buffer.Any())
如此有效地做您想避免的事 (count==0)
。然而,这张支票非常便宜,我想如果比其他涉及的成本便宜得多,即调度。唯一的问题可能是正在发生的分配量(每 50 毫秒创建一个 List<T>
),然后是即将到来的 GC Gen0 压力可能会增加。