C# - Lock 不起作用但 Volatile AND Lock 起作用?
C# - Lock not working but Volatile AND Lock does?
我正在尝试尽可能快速高效地轮询 API 以获得市场数据。 API 允许您根据请求从 batchSize 个市场获取市场数据。 API 允许您有 3 个并发请求但不能再多(或抛出错误)。
我可能从 batchSize 个以上的不同市场请求数据。
我不断循环遍历所有市场,分批请求数据,每个线程一批,随时 3 个线程。
市场总数(以及批次)可以随时更改。
我正在使用以下代码:
private static object lockObj = new object();
private void PollMarkets()
{
const int NumberOfConcurrentRequests = 3;
for (int i = 0; i < NumberOfConcurrentRequests; i++)
{
int batch = 0;
Task.Factory.StartNew(async () =>
{
while (true)
{
if (markets.Count > 0)
{
List<string> batchMarketIds;
lock (lockObj)
{
var numBatches = (int)Math.Ceiling((double)markets.Count / batchSize);
batchMarketIds = markets.Keys.Skip(batch*batchSize).Take(batchSize).ToList();
batch = (batch + 1) % numBatches;
}
var marketData = await GetMarketData(batchMarketIds);
// Do something with marketData
}
else
{
await Task.Delay(1000); // wait for some markets to be added.
}
}
}
});
}
}
即使在临界区有锁,每个线程都是从 batch = 0 开始的(每个线程经常轮询重复数据)。
如果我将 batch 更改为私有 volatile 字段,上面的代码将按我希望的方式工作(volatile 和锁定)。
所以出于某种原因我的锁不起作用?我觉得这很明显,但我想念它。
我认为这里最好使用锁而不是 volatile 字段,这也是正确的吗?
谢谢
问题是您在 for 循环内定义了批处理变量。这意味着线程正在使用它们自己的变量而不是共享它。
在我看来,您应该使用 Queue<> 来创建作业管道。
像这样
private int batchSize = 10;
private Queue<int> queue = new Queue<int>();
private void AddMarket(params int[] marketIDs)
{
lock (queue)
{
foreach (var marketID in marketIDs)
{
queue.Enqueue(marketID);
}
if (queue.Count >= batchSize)
{
Monitor.Pulse(queue);
}
}
}
private void Start()
{
for (var tid = 0; tid < 3; tid++)
{
Task.Run(async () =>
{
while (true)
{
List<int> toProcess;
lock (queue)
{
if (queue.Count < batchSize)
{
Monitor.Wait(queue);
continue;
}
toProcess = new List<int>(batchSize);
for (var count = 0; count < batchSize; count++)
{
toProcess.Add(queue.Dequeue());
}
if (queue.Count >= batchSize)
{
Monitor.Pulse(queue);
}
}
var marketData = await GetMarketData(toProcess);
}
});
}
}
我正在尝试尽可能快速高效地轮询 API 以获得市场数据。 API 允许您根据请求从 batchSize 个市场获取市场数据。 API 允许您有 3 个并发请求但不能再多(或抛出错误)。
我可能从 batchSize 个以上的不同市场请求数据。
我不断循环遍历所有市场,分批请求数据,每个线程一批,随时 3 个线程。
市场总数(以及批次)可以随时更改。
我正在使用以下代码:
private static object lockObj = new object();
private void PollMarkets()
{
const int NumberOfConcurrentRequests = 3;
for (int i = 0; i < NumberOfConcurrentRequests; i++)
{
int batch = 0;
Task.Factory.StartNew(async () =>
{
while (true)
{
if (markets.Count > 0)
{
List<string> batchMarketIds;
lock (lockObj)
{
var numBatches = (int)Math.Ceiling((double)markets.Count / batchSize);
batchMarketIds = markets.Keys.Skip(batch*batchSize).Take(batchSize).ToList();
batch = (batch + 1) % numBatches;
}
var marketData = await GetMarketData(batchMarketIds);
// Do something with marketData
}
else
{
await Task.Delay(1000); // wait for some markets to be added.
}
}
}
});
}
}
即使在临界区有锁,每个线程都是从 batch = 0 开始的(每个线程经常轮询重复数据)。
如果我将 batch 更改为私有 volatile 字段,上面的代码将按我希望的方式工作(volatile 和锁定)。
所以出于某种原因我的锁不起作用?我觉得这很明显,但我想念它。
我认为这里最好使用锁而不是 volatile 字段,这也是正确的吗?
谢谢
问题是您在 for 循环内定义了批处理变量。这意味着线程正在使用它们自己的变量而不是共享它。
在我看来,您应该使用 Queue<> 来创建作业管道。
像这样
private int batchSize = 10;
private Queue<int> queue = new Queue<int>();
private void AddMarket(params int[] marketIDs)
{
lock (queue)
{
foreach (var marketID in marketIDs)
{
queue.Enqueue(marketID);
}
if (queue.Count >= batchSize)
{
Monitor.Pulse(queue);
}
}
}
private void Start()
{
for (var tid = 0; tid < 3; tid++)
{
Task.Run(async () =>
{
while (true)
{
List<int> toProcess;
lock (queue)
{
if (queue.Count < batchSize)
{
Monitor.Wait(queue);
continue;
}
toProcess = new List<int>(batchSize);
for (var count = 0; count < batchSize; count++)
{
toProcess.Add(queue.Dequeue());
}
if (queue.Count >= batchSize)
{
Monitor.Pulse(queue);
}
}
var marketData = await GetMarketData(toProcess);
}
});
}
}