ConcurrentBag 和多线程应用程序的潜在竞争条件

Potential race conditions with ConcurrentBag and multithreaded application

在过去的几个月里,我一直在思考如何改进我使用 DispatcherTimer 定期检查资源以查看是否需要 updated/processed 的流程。更新资源(“产品”)后,将产品移至流程的下一步等。资源可能立即可用,也可能不可用。

我一直在挣扎的原因有两个。一个原因是我想异步实现这个过程,因为它目前只是同步的。第二个原因是我已经确定了我的实现卡住的区域,这似乎不是一个不常见的设计模式,但我不知道如何简洁地描述它,所以我无法弄清楚如何从 google.

一个相当重要的注意事项是我通过直接 USB 连接访问这些产品,因此我使用 LibUsbDotNet 与设备连接。我已将 USB 连接设为异步,因此我可以同时连接到多个产品并一次处理任意数量。


public Class Product
{
 public bool IsSoftwareUpdated = false;
 public bool IsProductInformationCorrect = false;
 public bool IsEOLProcessingCompleted = false;

 public Product(){}
 ~Product()
}

public class ProcessProduct
{
 List<Product> bagOfProducts                   = new List<Product>(new Product[10]);

 ConcurrentBag<Product> UnprocessedUnits       = new ConcurrentBag<Product>();
 ConcurrentBag<Product> CurrentlyUpdating      = new ConcurrentBag<Product>();
 ConcurrentBag<Product> CurrentlyVerifyingInfo = new ConcurrentBag<Product>();
 ConcurrentBag<Product> FinishedProcessing     = new ConcurrentBag<Product>();

 DispatcherTimer _timer = new DispatcherTimer();

 public ProcessProduct()
 {
     _timer.Tick += Timer_Tick;                            //Every 1 second, call Timer_Tick
     _timer.Interval = new TimeSpan(0,0,1);                //1 Second timer
     
     bagOfProducts.ForEach(o => UnprocessedUnits.Add(o));  //Fill the UnprocessedUnits with all products
 
     StartProcessing();
 }
 private void StartProcessing()
 {
     _timer.Start();
 }

 private void Timer_Tick(object sender, EventArgs e)
 {
     ProductOrganizationHandler();

     foreach(Product prod in CurrentlyUpdating.ToList())
     {
         UpdateProcessHandler(prod);  //Async function that uses await
     }
     foreach(Product prod in CurrentlyVerifyingInfo.ToList())
     {
         VerifyingInfoHandler(prod);  //Async function that uses Await
     }
     if(FinishedProcessing.Count == bagOfProducts.Count)
     {
         _timer.Stop();  //If all items have finished processing, then stop the process
     }
 }
 
 private void ProductOrganizationHandler()
 {
     //Take(read REMOVE) Product from each ConcurrentBag  1 by 1 and moves that item to the bag that it needs to go
     //depending on which process step is finished
     //(or puts it back in the same bag if that step was not finished).
     //E.G, all items are moved from UnprocessUnits to CurrentlyUpdating or CurrentlyVerifying etc.
     //If a product is finished updating, it is moved from CurrentlyUpdating to CurrentlyVerifying or FinishedProcessing
 }
 private async void UpdateProcessHandler(Product prod)
 {
     await Task.Delay(1000).ConfigureAwait(false);
     //Does some actual work validating USB communication and then running through the USB update
 }
 private async void VerifyingInfoHandler(Product prod)
 {
     await Task.Delay(1000).ConfigureAwait(false);
     //Does actual work here and communicates with the product via USB
 }
}

可通过 my code on Pastebin.

获得完整的编译就绪代码示例

所以,我的问题真的是:这段代码中是否有任何有意义的竞争条件?具体来说,使用 ProductOrganizationHandler() 代码和 Timer_Tick() 中的 ConcurrentBags 循环(因为对 Timer_Tick() 每秒发生一次)。我确信这段代码在大多数情况下都能正常工作,但我担心以后会出现难以追踪的错误,因为在 ProductOrganizationHandler()[=46 时出现了罕见的竞争条件=] 由于一些愚蠢的原因需要 > 1 秒到 运行。

作为辅助说明:这甚至是此类流程的最佳设计模式吗? C# 是我的第一门 OOP 语言,并且在工作中都是自学的(我几乎所有的工作都是嵌入式 C),所以我没有任何正式的 OOP 设计模式经验。

我的主要目标是在每个设备通过 USB 可用时异步 Update/Verify/Communicate。一旦列表中的所有产品都完成(或超时),则该过程结束。该项目在 .NET 5 中。

编辑:对于后来遇到同样问题的任何人,这就是我所做的。

我不明白 DispatcherTimer 将 Ticks 添加到 Dispatcher queue。这意味着只有 运行 如果还没有另一个 Tick 实例已经 运行ning 或者换句话说,Timer_Tick 将 运行 在下一个 Timer_Tick实例运行s.

所以,大多数(所有?)我的 Threading/concurrency 担忧都是没有根据的,我可以将 Timer_Tick 视为单线程非并发函数(它是)。

此外,为了防止 Ticks 堆积,我在 Timer_Tick 开始时 运行 _timer.Stop() 并在 Timer_Tick 结束时重新启动计时器。

首先,您使用的是 DispatchTimer,这会在 UI 线程上引发滴答声。据我所知,示例中没有进行多线程。还有其他计时器,如 System.Timers.Timer 会在后台线程上引发事件,如果这是有意的话。但是如果你只是想经常检查和更新状态,而不是 运行ning 任何阻塞的代码,那么只使用 UI 线程就可以了,并且会大大简化事情。

即使我们假设 ProductOrganizationHandler 在工作线程上执行了 运行,从一个并发集合中删除项目并将它们放入另一个集合通常仍然是安全的。但它不能保证项目按任何特定顺序处理,也不保证任何特定项目由计时器的给定滴答声处理。但是由于计时器会周期性地滴答作响,所以最终应该处理所有项目。请记住,大多数计时器都需要处理,因此您需要以某种方式处理它,包括处理是否过早停止。

请记住,async 并不意味着并发,因此除非您的 USB 库提供异步方法,否则我不会使用它。即使那样我也会避免 async void 因为这会引发捕获的同步上下文的异常,可能会导致应用程序崩溃,所以它应该主要用于最外层,比如按钮事件处理程序或计时器,然后你应该处理以某种方式出现异常。

至于最好的方法,我会看看 DataFlow 库。