在 .NET 中使用 Interlocked class 进行多线程处理的正确方法

Correct way to use the Interlocked class for multithreading in .NET

我有一个计数器,统计当前处理的大型报告

private int processedLargeReports;

我正在生成并启动五个线程,其中每个线程都访问此方法:

public bool GenerateReport(EstimatedReportSize reportSize)
{
    var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
    bool allowLargeReports = (this.processedLargeReports < Settings.Default.LargeReportLimit);
    var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
        currentDateTime.AddHours(
        this.timeoutValueInHoursBeforeReleaseLock), 
        reportSize, 
        CorrelationIdForPickingReport, 
        allowLargeReports);

    if (reportOrderNextInQueue.IsProcessing)
    {
        Interlocked.Increment(ref this.processedLargeReports);                
    }

    var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);

    var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
    if (reportOrderNextInQueue.IsProcessing)
    {
        Interlocked.Decrement(ref this.processedLargeReports);                
    }
    return works;           
}

"reportOrderNextInQueue" 变量从数据库获取报告顺序并检查报告顺序是 "Normal" 还是 "Large"(这是通过定义 bool IsProcessing 属性 reportOrderNextInQueue 变量)。如果是大型报告,系统随后会互锁递增 processedLargeReport int 并处理大型报告。一旦处理完大报表,系统互锁将值递减。

整个想法是我一次只允许处理一个报表,所以一旦一个线程正在处理一个大报表,其他线程就不能访问数据库中的大报表. bool allowLargeReport 变量检查 processedLargeReports int 和是否超出限制。

我很好奇这是否是正确的实施方式,因为我无法在星期一之前对其进行测试。我不确定是否必须使用 InterLocked class 或仅将 processedLargeReports 变量定义为易失性成员。

假设您有 5 个线程开始执行上面的 运行 代码,并且 LargeReportLimit 为 1。它们都将 processedLargeReports 读取为 0,allowLargeReports 对它们来说为真,并且它们将同时开始处理 5 个项目,尽管你的限制是 1。所以我真的不明白这段代码是如何实现你的目标的,如果我理解正确的话。

稍微扩展一下:您 阅读 processedLargeReports 然后 act 它(用它来检查您是否应该允许报告到处理)。你表现得好像这个变量不能在阅读和行动之间改变,但事实并非如此。在您读取变量和对变量执行操作之间,任何数量的线程都可以使用 processedLargeReports 做任何事情,因为您没有锁定。在这种情况下互锁只会确保在所有线程处理完所有任务后,processedLargeReports 将始终为 0,仅此而已。

如果您需要限制对某些资源的并发访问 - 只需为此使用适当的工具:Semaphore 或 SemaphoreSlim classes。创建允许 LargeReportLimit 线程进入的信号量。在处理报告之前,等待您的信号量。如果达到并发线程处理报告的数量,这将阻塞。处理完成后,释放信号量以允许等待线程进入。这里不需要使用 Interlocked class。

volatile 不提供线程安全。与多线程一样,您需要一些同步 - 它可以基于 Interlockedlock 或任何其他同步原语,具体取决于您的需要。您选择了 Interlocked - 很好,但是您有竞争条件。您读取任何同步块之外的 processedLargeReports 字段并根据该值做出决定。但它可能在您阅读后立即发生变化,因此整个逻辑将无法正常工作。正确的方法是始终执行 Interlocked.Increment 并将您的逻辑基于返回值。像这样:

首先,让字段使用更好的名称

private int processingLargeReports;

然后

public bool GenerateReport(EstimatedReportSize reportSize)
{
    var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
    bool allowLargeReports = 
       (Interlocked.Increment(ref this.processingLargeReports) <= Settings.Default.LargeReportLimit);
    if (!allowLargeReports)
        Interlocked.Decrement(ref this.processingLargeReports);
    var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
        currentDateTime.AddHours(
        this.timeoutValueInHoursBeforeReleaseLock), 
        reportSize, 
        CorrelationIdForPickingReport, 
        allowLargeReports);
    if (allowLargeReports && !reportOrderNextInQueue.IsProcessing)
        Interlocked.Decrement(ref this.processingLargeReports);

    var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);

    var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
    if (allowLargeReports && reportOrderNextInQueue.IsProcessing)
        Interlocked.Decrement(ref this.processingLargeReports);
    return works;           
}

请注意,这也包含竞争条件,但会保留您的 LargeReportLimit 约束。

编辑: 现在我在想,因为你的处理是基于 AllowIs 大报告,Interlocked 不是一个好的选择,最好使用基于 Monitor 的方法,例如:

private int processingLargeReports;
private object processingLargeReportsLock = new object();

private void AcquireProcessingLargeReportsLock(ref bool lockTaken)
{
    Monitor.Enter(this.processingLargeReportsLock, ref lockTaken); 
}

private void ReleaseProcessingLargeReportsLock(ref bool lockTaken)
{
    if (!lockTaken) return;
    Monitor.Exit(this.processingLargeReportsLock);
    lockTaken = false;
}

public bool GenerateReport(EstimatedReportSize reportSize)
{
    bool lockTaken = false;
    try
    {
        this.AcquireProcessingLargeReportsLock(ref lockTaken); 
        bool allowLargeReports = (this.processingLargeReports < Settings.Default.LargeReportLimit);
        if (!allowLargeReports)
        {
            this.ReleaseProcessingLargeReportsLock(ref lockTaken);
        }
        var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
        var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
            currentDateTime.AddHours(
            this.timeoutValueInHoursBeforeReleaseLock), 
            reportSize, 
            CorrelationIdForPickingReport, 
            allowLargeReports);
        if (reportOrderNextInQueue.IsProcessing)
        {
            this.processingLargeReports++;
            this.ReleaseProcessingLargeReportsLock(ref lockTaken);
        }            
        var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
        var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
        if (reportOrderNextInQueue.IsProcessing)
        {
            this.AcquireProcessingLargeReportsLock(ref lockTaken); 
            this.processingLargeReports--;
        }            
        return works;
    }
    finally
    {
        this.ReleaseProcessingLargeReportsLock(ref lockTaken);
    }           
}