在 Producer/Consumer 中处理无限任务的最有效方法是什么?
What's the most efficient way to handle infinite tasks in Producer/Consumer?
我在云队列 (Azure) 中有数 GB 的数据(存储在消息中,每条消息大约 500KB)并且数据源源不断。
我需要对每条消息做一些处理。我决定创建 2 个后台工作程序,一个将数据存入内存,另一个处理该数据:
GetMessage(CloudQueue cloudQueue, LocalQueue localQueue)
{
lock (localQueue)
{
localQueue.Enqueue(cloudQueue.Dequeue());
}
}
ProcessMessage(LocalQueue localQueue)
{
lock (localQueue)
{
Process(localQueue.Dequeue());
}
}
问题是数据永远不会停止,所以我将花费大量时间来同步本地队列。是否有针对此类问题的已知模式?
处理时不需要持有锁
Item i;
lock (localQueue)
{
i = localQueue.Dequeue();
}
Process(i);
因此应该没有什么争执。如有必要,通过批处理插入来降低生产者获取入队锁的频率:而不是队列持有单个项目,而是持有批次。您可以通过平均批量大小有效地减少锁的数量。您可以有一个简单的批处理模型,例如,每 10 个或按时间或时间和阈值的某种组合。
我在云队列 (Azure) 中有数 GB 的数据(存储在消息中,每条消息大约 500KB)并且数据源源不断。
我需要对每条消息做一些处理。我决定创建 2 个后台工作程序,一个将数据存入内存,另一个处理该数据:
GetMessage(CloudQueue cloudQueue, LocalQueue localQueue)
{
lock (localQueue)
{
localQueue.Enqueue(cloudQueue.Dequeue());
}
}
ProcessMessage(LocalQueue localQueue)
{
lock (localQueue)
{
Process(localQueue.Dequeue());
}
}
问题是数据永远不会停止,所以我将花费大量时间来同步本地队列。是否有针对此类问题的已知模式?
处理时不需要持有锁
Item i;
lock (localQueue)
{
i = localQueue.Dequeue();
}
Process(i);
因此应该没有什么争执。如有必要,通过批处理插入来降低生产者获取入队锁的频率:而不是队列持有单个项目,而是持有批次。您可以通过平均批量大小有效地减少锁的数量。您可以有一个简单的批处理模型,例如,每 10 个或按时间或时间和阈值的某种组合。