在 Producer/Consumer 中处理无限任务的最有效方法是什么?

What's the most efficient way to handle infinite tasks in Producer/Consumer?

我在云队列 (Azure) 中有数 GB 的数据(存储在消息中,每条消息大约 500KB)并且数据源源不断。

我需要对每条消息做一些处理。我决定创建 2 个后台工作程序,一个将数据存入内存,另一个处理该数据:

GetMessage(CloudQueue cloudQueue, LocalQueue localQueue)
{
    lock (localQueue)
    {
        localQueue.Enqueue(cloudQueue.Dequeue());
    }
}

ProcessMessage(LocalQueue localQueue)
{
    lock (localQueue)
    {
        Process(localQueue.Dequeue());
    }
}

问题是数据永远不会停止,所以我将花费大量时间来同步本地队列。是否有针对此类问题的已知模式?

处理时不需要持有锁

Item i;
lock (localQueue)
{
    i = localQueue.Dequeue();
}
Process(i);

因此应该没有什么争执。如有必要,通过批处理插入来降低生产者获取入队锁的频率:而不是队列持有单个项目,而是持有批次。您可以通过平均批量大小有效地减少锁的数量。您可以有一个简单的批处理模型,例如,每 10 个或按时间或时间和阈值的某种组合。