创建具有固定大小或时间跨度的异步序列
Create asynchronous sequence with fixed size or timespan elapsed
下一个方法每秒调用多次,我必须在多个事件到达或时间跨度已过时通知订阅者:
public async Task SendEventsAsync(IEnumerable<Event> events)
{
foreach(var event in events)
{
// Notify my subscriber
}
}
该方法属于单例,将在整个应用程序生命周期中被调用。
我不想一次通知一个。我想在通知我的观察者之前等待大量此类数据(例如 10000 项),如果自上次通知以来已经过去了一定时间,也通知(例如每分钟)。实际上,我不需要将我的事件对象发送给订阅者,只需通知已收到一组新的事件即可。
我一直在尝试 Reactive Extensions,我想它可能对我有帮助:
var source = Observable.Interval(TimeSpan.FromMilliseconds(1));
var idealBatchSize = 10000;
var maxTimeDelay = TimeSpan.FromSeconds(60);
source
.Buffer(maxTimeDelay, idealBatchSize)
.Subscribe(buffer =>
Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count));
此代码创建一个具有固定时间间隔的无限序列,然后投影到一个具有固定大小和最大延迟时间的缓冲区,这与我正在寻找的类似。但是我的序列应该从我的方法中获取,而不是固定的时间间隔。
我不知道我的作品应该如何组合在一起,我需要创建什么样的 Observable 序列。或者我可能错误地关注了这个问题。
一个带有间隔和静态计数器的序列是否足够?
我认为你的想法是正确的。我假设你有这个结构:
public class EventNotification { }
public class SingletonThing
{
//don't need async for Rx
public void SendEventsAsync(IEnumerable<EventNotification> events)
{
foreach (var element in events)
{
}
}
}
您需要做的就是将事件通过管道传输到可观察对象中,然后您可以使用 Buffer
根据需要对它们进行批处理:
public class SingletonThing
{
private readonly Subject<EventNotification> _subject = new Subject<EventNotification>();
public void SendEventsAsync(IEnumerable<EventNotification> events)
{
foreach (var element in events)
{
_subject.OnNext(element);
}
}
//not necessary to expose, but could be helpful
public IObservable<EventNotification> AllEvents => _subject.AsObservable();
private int idealBatchSize = 10000;
private TimeSpan maxTimeDelay = TimeSpan.FromSeconds(60);
public IObservable<IList<EventNotification>> BatchedEvents => _subject
.Buffer(maxTimeDelay, idealBatchSize);
}
订阅代码将按如下方式调用:
SingletonThing.BatchedEvents.Subscribe(buffer =>
Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count)
);
下一个方法每秒调用多次,我必须在多个事件到达或时间跨度已过时通知订阅者:
public async Task SendEventsAsync(IEnumerable<Event> events)
{
foreach(var event in events)
{
// Notify my subscriber
}
}
该方法属于单例,将在整个应用程序生命周期中被调用。
我不想一次通知一个。我想在通知我的观察者之前等待大量此类数据(例如 10000 项),如果自上次通知以来已经过去了一定时间,也通知(例如每分钟)。实际上,我不需要将我的事件对象发送给订阅者,只需通知已收到一组新的事件即可。
我一直在尝试 Reactive Extensions,我想它可能对我有帮助:
var source = Observable.Interval(TimeSpan.FromMilliseconds(1));
var idealBatchSize = 10000;
var maxTimeDelay = TimeSpan.FromSeconds(60);
source
.Buffer(maxTimeDelay, idealBatchSize)
.Subscribe(buffer =>
Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count));
此代码创建一个具有固定时间间隔的无限序列,然后投影到一个具有固定大小和最大延迟时间的缓冲区,这与我正在寻找的类似。但是我的序列应该从我的方法中获取,而不是固定的时间间隔。
我不知道我的作品应该如何组合在一起,我需要创建什么样的 Observable 序列。或者我可能错误地关注了这个问题。
一个带有间隔和静态计数器的序列是否足够?
我认为你的想法是正确的。我假设你有这个结构:
public class EventNotification { }
public class SingletonThing
{
//don't need async for Rx
public void SendEventsAsync(IEnumerable<EventNotification> events)
{
foreach (var element in events)
{
}
}
}
您需要做的就是将事件通过管道传输到可观察对象中,然后您可以使用 Buffer
根据需要对它们进行批处理:
public class SingletonThing
{
private readonly Subject<EventNotification> _subject = new Subject<EventNotification>();
public void SendEventsAsync(IEnumerable<EventNotification> events)
{
foreach (var element in events)
{
_subject.OnNext(element);
}
}
//not necessary to expose, but could be helpful
public IObservable<EventNotification> AllEvents => _subject.AsObservable();
private int idealBatchSize = 10000;
private TimeSpan maxTimeDelay = TimeSpan.FromSeconds(60);
public IObservable<IList<EventNotification>> BatchedEvents => _subject
.Buffer(maxTimeDelay, idealBatchSize);
}
订阅代码将按如下方式调用:
SingletonThing.BatchedEvents.Subscribe(buffer =>
Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count)
);