C# Parallel.For 没有执行每一步

C# Parallel.For not executing every step

我一直在为目前按顺序运行的导入服务制作模型。然而,我的模型似乎出现了一个奇怪的问题,有时 for 循环中的一两个项目没有被执行。

class Service
{
    private Thread _worker;
    private bool _stopping;        
    private CancellationTokenSource _cts;
    private ParallelOptions _po;
    private Repository _repository;

    public void Start(Repository repository)
    {
        _repository = repository;
        _cts = new CancellationTokenSource();            
        _po = new ParallelOptions { 
            CancellationToken = _cts.Token
        };

        _worker = new Thread(ProcessImport);
        _worker.Start();            
    }

    public void Stop()
    {
        _stopping = true;
        _cts.Cancel();
        if(_worker != null && _worker.IsAlive)
            _worker.Join();            
    }

    private void ProcessImport()
    {
        while (!_stopping)
        {
            var import = _repository.GetInProgressImport();
            if (import == null)
            {
                Thread.Sleep(1000);
                continue;
            }

            try
            {
                Parallel.For(0, 1000, _po, i => Work.DoWork(i, import, _cts.Token, _repository));
            }
            catch (OperationCanceledException)
            {
                // Unmark batch so it can be started again
                batch = _repository.GetBatch(import.BatchId);
                batch.Processing = false;
                _repository.UpdateBatch(batch);
                Console.WriteLine("Aborted import {0}", import.ImportId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Something went wrong: {0}", ex.Message);
            }         
        }         
    }
}

class Work
{
    public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
    {         
        // Simulate doing some work
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);

        // Update the batch
        var batch = repository.GetBatch(import.BatchId);
        batch.Processed++;
        if (batch.Processed == batch.Total)
        {
            batch.Finished = DateTime.Now;
            batch.Processing = false;                
        }            
        repository.UpdateBatch(batch);            
    }

    private static void HandleAbort(CancellationToken ct)
    {
        if (!ct.IsCancellationRequested) 
            return;
        ct.ThrowIfCancellationRequested();
    }
}

使用此代码,我经常发现批次永远不会完成并且 batch.Processed = 999 或 998。

任何人都可以阐明我做错了什么。

提前致谢。

编辑:

要清楚 repository/batch 对象 - 我相信我当前的模型是线程安全的

class Repository
{
    private ConcurrentBag<Batch> _batchData = new ConcurrentBag<Batch>();
    private ConcurrentBag<Import> _importData = new ConcurrentBag<Import>();

    public void CreateImport(Import import)
    {
        _importData.Add(import);
    }

    public Import GetInProgressImport()
    {
        var import = _importData
            .Join(_batchData, i => i.BatchId, b => b.BatchId, (i, b) => new
            {
                Import = i,
                Batch = b
            })
            .Where(j => j.Batch.Processed < j.Batch.Total && !j.Batch.Processing)
            .OrderByDescending(j => j.Batch.Total - j.Batch.Processed)
            .ThenBy(j => j.Batch.BatchId - j.Batch.BatchId)
            .Select(j => j.Import)                
            .FirstOrDefault();

        if (import == null)
            return null;

        // mark the batch as processing
        var batch = GetBatch(import.BatchId);
        batch.Processing = true;
        UpdateBatch(batch);

        return import;
    }

    public List<Import> ListImports()
    {
        return _importData.ToList();
    }

    public void CreateBatch(Batch batch)
    {
        _batchData.Add(batch);
    }

    public Batch GetBatch(Int64 batchId)
    {
        return _batchData.FirstOrDefault(b => b.BatchId == batchId);
    }

    public void UpdateBatch(Batch batch)
    {
        var batchData = _batchData.First(b => b.BatchId == batch.BatchId);
        batchData.Total = batch.Total;
        batchData.Processed = batch.Processed;
        batchData.Started = batch.Started;
        batchData.Finished = batch.Finished;
        batchData.Processing = batch.Processing;
    }
}

class Import
{
    public Int64 ImportId { get; set; }
    public Int64 BatchId { get; set; }
}

class Batch
{
    public Int64 BatchId { get; set; }
    public int Total { get; set; }
    public int Processed { get; set; }
    public DateTime Created { get; set; }
    public DateTime Started { get; set; }
    public DateTime Finished { get; set; }   
    public bool Processing { get; set; }   
}

这只是一个模型,因此我的存储库背后没有数据库或其他持久性。

此外,我不是在与我的批次竞争 i 的值,而是批次对象的 Processed 属性 指示的循环迭代次数(实际已经完成的工作) .

谢谢

解决方案:

我忘记了需要同步批处理更新。应该看起来像:

class Work
{
    private static object _sync = new object();

    public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
    {       
        // Do work            
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);

        lock (_sync)
        {
            // Update the batch
            var batch = repository.GetBatch(import.BatchId);
            batch.Processed++;
            if (batch.Processed == batch.Total)
            {
                batch.Finished = DateTime.Now;
                batch.Processing = false;
            }
            repository.UpdateBatch(batch);
        }
    }

    private static void HandleAbort(CancellationToken ct)
    {
        if (!ct.IsCancellationRequested) 
            return;
        ct.ThrowIfCancellationRequested();
    }
}

根据 MSDNParallel.For 的重载将第二个整数指定为 toExclusive,意思是达到但不满足该值。换句话说,999 是预期结果,而不是 1000 - 但还要注意,从“0”开始,您的循环确实执行了 1,000 次。

乍一看,您的代码是并行的,因此请确保您没有看到“999”调用与“998”调用顺序不一致 - 这是因为通过并行执行,您的代码本质上是无序,并且很容易最终被非常随机地重新排列。另外,请阅读 lock,因为您的代码可能正在访问它应该等待的值。

batch.Processed 上的更新似乎丢失了。增量不是原子的。 batch.Processed++; 很活泼。使用 Interlocked.Increment.

在我看来,您目前对线程的理解还不够深入。在没有很好理解的情况下执行如此复杂的线程是非常危险的。你犯的错误很难测试,但生产会发现它们。