C# .NET - 缓冲消息 w/Timer

C# .NET - Buffer messages w/Timer

我需要实现一个同样基于时间的消息缓冲系统。

我需要做的是存储我的 class 实例,然后在我达到 100 个实例或 1 分钟过去后将它们发送出去。

基本上:

List<Message> messages;

public void GotNewMessage(Message msg)
{
    messages.add(msg);

    if (messages.count() == 100 || timer.elapsed(1 minute))
    {
        SendMessages(messages);
        messages.clear()
    }
}

我似乎无法弄清楚如何在不过度使用锁的情况下实现这一点,这会大大减慢进程。有谁知道实施这种系统的好方法?提前致谢。

首先,您应该考虑使用 ConcurrentQueue<> 代替 List<>ConcurrentQueue<> 始终是线程安全的,不需要额外的锁。有了这个,您就已经为消息队列节省了一把锁。 Interlocked 提供原子性,当它不可用时。

根据 C# language specification,独立的 reads/writes 是原子的(但仅对于某些数据类型和 long 并不总是原子的 - 这就是我转移的原因DateTime.Now.Ticks 获得 int32 而不会丢失任何会影响经过时间的位)和读-修改-写(例如 ++i)永远不是原子的。

移位(例如 <<)本身是原子的,不需要任何额外的锁定。

private ConcurrentQueue<Message> Queue = new ConcurrentQueue<Message>();
private int QueueSize = 0;
private int LastSend = (int)(DateTime.Now.Ticks >> 23);
private int LastMessage = (int)(DateTime.Now.Ticks >> 23);

public void GotNewMessage(Message Message)
{
    Queue.Enqueue(Message);

    Interlocked.Increment(ref QueueSize);
    Interlocked.Exchange(ref LastMessage, (int)(DateTime.Now.Ticks >> 23));

    if (Interlocked.CompareExchange(ref QueueSize, 0, 100) >= 100 || 
        LastMessage - LastSend >= 60)
    {
        Message Dummy;
        while (!Queue.IsEmpty)
            if (Queue.TryDequeue(out Dummy))
                SendMessage(Dummy);

        Interlocked.Exchange(ref LastSend, (int)(DateTime.Now.Ticks >> 23));
    }
}

public void SendMessage(Message Message)
{
    // ...
}

编辑: 可能会发送超过 100 条消息。如果你希望发送严格的 100 条消息,你可以在循环中实现另一个原子增量。

有一个很棒的库可以满足这些需求(将时间与序列结合起来),它就是 Reactive Extensions。参见 https://github.com/Reactive-Extensions/Rx.NET

你可以这样写

void Main()
{
    messages
        .Buffer(TimeSpan.FromMinutes(1), 100) // Buffer until 100 items or 1 minute has elapsed, whatever comes first.
        .Subscribe(msgs => SendMessages(msgs));     
}

Subject<Message> messages = new Subject<Message>();

public void GotNewMessage(Message msg)
{
    messages.OnNext(msg);
}

注意:这还没有准备好生产,但它展示了如何做的基础知识。根据您获取消息的位置,有更好的方法来创建要订阅的 Observable。

更多参考资料:

如果您的消息是使用事件接收的,您可以 link 将事件发送到 RX 流,请参阅 https://msdn.microsoft.com/en-us/library/hh242978(v=vs.103).aspx and https://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.fromeventpattern(v=vs.103).aspx