在 .NET 中使用 Interlocked class 进行多线程处理的正确方法
Correct way to use the Interlocked class for multithreading in .NET
我有一个计数器,统计当前处理的大型报告
private int processedLargeReports;
我正在生成并启动五个线程,其中每个线程都访问此方法:
public bool GenerateReport(EstimatedReportSize reportSize)
{
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
bool allowLargeReports = (this.processedLargeReports < Settings.Default.LargeReportLimit);
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (reportOrderNextInQueue.IsProcessing)
{
Interlocked.Increment(ref this.processedLargeReports);
}
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (reportOrderNextInQueue.IsProcessing)
{
Interlocked.Decrement(ref this.processedLargeReports);
}
return works;
}
"reportOrderNextInQueue" 变量从数据库获取报告顺序并检查报告顺序是 "Normal" 还是 "Large"(这是通过定义 bool IsProcessing 属性 reportOrderNextInQueue 变量)。如果是大型报告,系统随后会互锁递增 processedLargeReport int 并处理大型报告。一旦处理完大报表,系统互锁将值递减。
整个想法是我一次只允许处理一个报表,所以一旦一个线程正在处理一个大报表,其他线程就不能访问数据库中的大报表. bool allowLargeReport 变量检查 processedLargeReports int 和是否超出限制。
我很好奇这是否是正确的实施方式,因为我无法在星期一之前对其进行测试。我不确定是否必须使用 InterLocked class 或仅将 processedLargeReports 变量定义为易失性成员。
假设您有 5 个线程开始执行上面的 运行 代码,并且 LargeReportLimit 为 1。它们都将 processedLargeReports 读取为 0,allowLargeReports 对它们来说为真,并且它们将同时开始处理 5 个项目,尽管你的限制是 1。所以我真的不明白这段代码是如何实现你的目标的,如果我理解正确的话。
稍微扩展一下:您 阅读 processedLargeReports 然后 act 它(用它来检查您是否应该允许报告到处理)。你表现得好像这个变量不能在阅读和行动之间改变,但事实并非如此。在您读取变量和对变量执行操作之间,任何数量的线程都可以使用 processedLargeReports 做任何事情,因为您没有锁定。在这种情况下互锁只会确保在所有线程处理完所有任务后,processedLargeReports 将始终为 0,仅此而已。
如果您需要限制对某些资源的并发访问 - 只需为此使用适当的工具:Semaphore 或 SemaphoreSlim classes。创建允许 LargeReportLimit 线程进入的信号量。在处理报告之前,等待您的信号量。如果达到并发线程处理报告的数量,这将阻塞。处理完成后,释放信号量以允许等待线程进入。这里不需要使用 Interlocked class。
volatile
不提供线程安全。与多线程一样,您需要一些同步 - 它可以基于 Interlocked
、lock
或任何其他同步原语,具体取决于您的需要。您选择了 Interlocked
- 很好,但是您有竞争条件。您读取任何同步块之外的 processedLargeReports
字段并根据该值做出决定。但它可能在您阅读后立即发生变化,因此整个逻辑将无法正常工作。正确的方法是始终执行 Interlocked.Increment
并将您的逻辑基于返回值。像这样:
首先,让字段使用更好的名称
private int processingLargeReports;
然后
public bool GenerateReport(EstimatedReportSize reportSize)
{
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
bool allowLargeReports =
(Interlocked.Increment(ref this.processingLargeReports) <= Settings.Default.LargeReportLimit);
if (!allowLargeReports)
Interlocked.Decrement(ref this.processingLargeReports);
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (allowLargeReports && !reportOrderNextInQueue.IsProcessing)
Interlocked.Decrement(ref this.processingLargeReports);
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (allowLargeReports && reportOrderNextInQueue.IsProcessing)
Interlocked.Decrement(ref this.processingLargeReports);
return works;
}
请注意,这也包含竞争条件,但会保留您的 LargeReportLimit 约束。
编辑: 现在我在想,因为你的处理是基于 Allow 和 Is 大报告,Interlocked
不是一个好的选择,最好使用基于 Monitor
的方法,例如:
private int processingLargeReports;
private object processingLargeReportsLock = new object();
private void AcquireProcessingLargeReportsLock(ref bool lockTaken)
{
Monitor.Enter(this.processingLargeReportsLock, ref lockTaken);
}
private void ReleaseProcessingLargeReportsLock(ref bool lockTaken)
{
if (!lockTaken) return;
Monitor.Exit(this.processingLargeReportsLock);
lockTaken = false;
}
public bool GenerateReport(EstimatedReportSize reportSize)
{
bool lockTaken = false;
try
{
this.AcquireProcessingLargeReportsLock(ref lockTaken);
bool allowLargeReports = (this.processingLargeReports < Settings.Default.LargeReportLimit);
if (!allowLargeReports)
{
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (reportOrderNextInQueue.IsProcessing)
{
this.processingLargeReports++;
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (reportOrderNextInQueue.IsProcessing)
{
this.AcquireProcessingLargeReportsLock(ref lockTaken);
this.processingLargeReports--;
}
return works;
}
finally
{
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
}
我有一个计数器,统计当前处理的大型报告
private int processedLargeReports;
我正在生成并启动五个线程,其中每个线程都访问此方法:
public bool GenerateReport(EstimatedReportSize reportSize)
{
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
bool allowLargeReports = (this.processedLargeReports < Settings.Default.LargeReportLimit);
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (reportOrderNextInQueue.IsProcessing)
{
Interlocked.Increment(ref this.processedLargeReports);
}
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (reportOrderNextInQueue.IsProcessing)
{
Interlocked.Decrement(ref this.processedLargeReports);
}
return works;
}
"reportOrderNextInQueue" 变量从数据库获取报告顺序并检查报告顺序是 "Normal" 还是 "Large"(这是通过定义 bool IsProcessing 属性 reportOrderNextInQueue 变量)。如果是大型报告,系统随后会互锁递增 processedLargeReport int 并处理大型报告。一旦处理完大报表,系统互锁将值递减。
整个想法是我一次只允许处理一个报表,所以一旦一个线程正在处理一个大报表,其他线程就不能访问数据库中的大报表. bool allowLargeReport 变量检查 processedLargeReports int 和是否超出限制。
我很好奇这是否是正确的实施方式,因为我无法在星期一之前对其进行测试。我不确定是否必须使用 InterLocked class 或仅将 processedLargeReports 变量定义为易失性成员。
假设您有 5 个线程开始执行上面的 运行 代码,并且 LargeReportLimit 为 1。它们都将 processedLargeReports 读取为 0,allowLargeReports 对它们来说为真,并且它们将同时开始处理 5 个项目,尽管你的限制是 1。所以我真的不明白这段代码是如何实现你的目标的,如果我理解正确的话。
稍微扩展一下:您 阅读 processedLargeReports 然后 act 它(用它来检查您是否应该允许报告到处理)。你表现得好像这个变量不能在阅读和行动之间改变,但事实并非如此。在您读取变量和对变量执行操作之间,任何数量的线程都可以使用 processedLargeReports 做任何事情,因为您没有锁定。在这种情况下互锁只会确保在所有线程处理完所有任务后,processedLargeReports 将始终为 0,仅此而已。
如果您需要限制对某些资源的并发访问 - 只需为此使用适当的工具:Semaphore 或 SemaphoreSlim classes。创建允许 LargeReportLimit 线程进入的信号量。在处理报告之前,等待您的信号量。如果达到并发线程处理报告的数量,这将阻塞。处理完成后,释放信号量以允许等待线程进入。这里不需要使用 Interlocked class。
volatile
不提供线程安全。与多线程一样,您需要一些同步 - 它可以基于 Interlocked
、lock
或任何其他同步原语,具体取决于您的需要。您选择了 Interlocked
- 很好,但是您有竞争条件。您读取任何同步块之外的 processedLargeReports
字段并根据该值做出决定。但它可能在您阅读后立即发生变化,因此整个逻辑将无法正常工作。正确的方法是始终执行 Interlocked.Increment
并将您的逻辑基于返回值。像这样:
首先,让字段使用更好的名称
private int processingLargeReports;
然后
public bool GenerateReport(EstimatedReportSize reportSize)
{
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
bool allowLargeReports =
(Interlocked.Increment(ref this.processingLargeReports) <= Settings.Default.LargeReportLimit);
if (!allowLargeReports)
Interlocked.Decrement(ref this.processingLargeReports);
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (allowLargeReports && !reportOrderNextInQueue.IsProcessing)
Interlocked.Decrement(ref this.processingLargeReports);
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (allowLargeReports && reportOrderNextInQueue.IsProcessing)
Interlocked.Decrement(ref this.processingLargeReports);
return works;
}
请注意,这也包含竞争条件,但会保留您的 LargeReportLimit 约束。
编辑: 现在我在想,因为你的处理是基于 Allow 和 Is 大报告,Interlocked
不是一个好的选择,最好使用基于 Monitor
的方法,例如:
private int processingLargeReports;
private object processingLargeReportsLock = new object();
private void AcquireProcessingLargeReportsLock(ref bool lockTaken)
{
Monitor.Enter(this.processingLargeReportsLock, ref lockTaken);
}
private void ReleaseProcessingLargeReportsLock(ref bool lockTaken)
{
if (!lockTaken) return;
Monitor.Exit(this.processingLargeReportsLock);
lockTaken = false;
}
public bool GenerateReport(EstimatedReportSize reportSize)
{
bool lockTaken = false;
try
{
this.AcquireProcessingLargeReportsLock(ref lockTaken);
bool allowLargeReports = (this.processingLargeReports < Settings.Default.LargeReportLimit);
if (!allowLargeReports)
{
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (reportOrderNextInQueue.IsProcessing)
{
this.processingLargeReports++;
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (reportOrderNextInQueue.IsProcessing)
{
this.AcquireProcessingLargeReportsLock(ref lockTaken);
this.processingLargeReports--;
}
return works;
}
finally
{
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
}