创建具有固定大小或时间跨度的异步序列

Create asynchronous sequence with fixed size or timespan elapsed

下一个方法每秒调用多次,我必须在多个事件到达或时间跨度已过时通知订阅者:

public async Task SendEventsAsync(IEnumerable<Event> events)
{
    foreach(var event in events)
    {
        // Notify my subscriber
    }
}

该方法属于单例,将在整个应用程序生命周期中被调用。

我不想一次通知一个。我想在通知我的观察者之前等待大量此类数据(例如 10000 项),如果自上次通知以来已经过去了一定时间,也通知(例如每分钟)。实际上,我不需要将我的事件对象发送给订阅者,只需通知已收到一组新的事件即可。

我一直在尝试 Reactive Extensions,我想它可能对我有帮助:

var source = Observable.Interval(TimeSpan.FromMilliseconds(1));

var idealBatchSize = 10000;
var maxTimeDelay = TimeSpan.FromSeconds(60);
source
   .Buffer(maxTimeDelay, idealBatchSize)
   .Subscribe(buffer => 
        Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count));

此代码创建一个具有固定时间间隔的无限序列,然后投影到一个具有固定大小和最大延迟时间的缓冲区,这与我正在寻找的类似。但是我的序列应该从我的方法中获取,而不是固定的时间间隔。

我不知道我的作品应该如何组合在一起,我需要创建什么样的 Observable 序列。或者我可能错误地关注了这个问题。

一个带有间隔和静态计数器的序列是否足够?

我认为你的想法是正确的。我假设你有这个结构:

public class EventNotification { }

public class SingletonThing
{
    //don't need async for Rx
    public void SendEventsAsync(IEnumerable<EventNotification> events)
    {
        foreach (var element in events)
        {
        }
    }
}

您需要做的就是将事件通过管道传输到可观察对象中,然后您可以使用 Buffer 根据需要对它们进行批处理:

public class SingletonThing
{
    private readonly Subject<EventNotification> _subject = new Subject<EventNotification>();
    public void SendEventsAsync(IEnumerable<EventNotification> events)
    {
        foreach (var element in events)
        {
            _subject.OnNext(element);
        }
    }

    //not necessary to expose, but could be helpful
    public IObservable<EventNotification> AllEvents => _subject.AsObservable();

    private int idealBatchSize = 10000;
    private TimeSpan maxTimeDelay = TimeSpan.FromSeconds(60);
    public IObservable<IList<EventNotification>> BatchedEvents => _subject
        .Buffer(maxTimeDelay, idealBatchSize);
}

订阅代码将按如下方式调用:

SingletonThing.BatchedEvents.Subscribe(buffer =>
    Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count)
);