在 C# 中打印从 BlockingCollection 获取的数据表时 Foreach 抛出错误

Foreach throwing errors when printing datatable obtained from BlockingCollection in C#

我在尝试多线程时遇到了绊脚石。我想我知道问题出在哪里,但无法确定如何解决。但我可能错了。

总而言之,我有生产者线程和消费者线程。生产者线程从外部源收集数据到 datatable 中,然后将它们放入集合中。消费者然后从集合中获取数据tables。我将 BlockingCollection 用作 public 静态集合,这样两个线程都可以访问它,它存在于两个不同的 类 中。我现在将展示代码的主要部分,然后解释什么是有效的,什么是无效的。

生产者线程:

try
{
     dataTable.Clear();
     adapter.Fill(dataTable);
     dataCaptured = true;
     timeout = 0;
     ThreadInfo.setCurrentDate(startDate);
     ThreadInfo.dataTableCollection.Add(dataTable);
}

消费者线程

while(true)
{
     DataTable testTable = ThreadInfo.dataTableCollection.Take();
     foreach (DataRow datarow in testTable.Rows)
     {
          foreach (var item in datarow.ItemArray)
          {
                Console.WriteLine(item);
          }
     }
}

所以我的测试表明,当生产者线程创建数据table时,它成功地将它们添加到集合中。我可以通过在 add 方法之前和之后使用 count 来看到这一点。计算每个 table 中的行数,我还可以确认添加的 table 与创建的 table 相同。此外,take 方法还成功删除了 table 并且 table 与输入的相匹配。我通过计算集合中 table 的数量和计算 'taken' 数据 table 中的行数来了解这一点。

我的问题是当我尝试 运行 foreach 循环打印出结果时。最初它工作并开始将数据打印到屏幕,但随后抛出此错误:

System.InvalidOperationException was unhandled
  HResult=-2146233079
  Message=Collection was modified; enumeration operation might not execute.
  Source=System.Data
  StackTrace:
       at System.Data.RBTree`1.RBTreeEnumerator.MoveNext()
       at pullPlexTable.InputThreads.dataConsumerThread() in \srv-file01\users$\dkb\Visual Studio 2013\Projects\pullPlexTable\pullPlexTable\InputThread.cs:line 39
       at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
       at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
       at System.Threading.ThreadHelper.ThreadStart()
  InnerException:

我相信这似乎是在将新数据table 添加到集合中时发生的。我相信这是由显示 .movenext() 遇到错误的错误支持的,这意味着它试图移动到集合中的下一个值?

但我真的不知道该怎么办,即使我是对的。我试过复制数据table,但仍然遇到错误。我本以为,一旦它创建了一个数据table并使用集合中的 take 方法复制了数据table,我就可以随心所欲地对其进行迭代。我的直觉是 datatable 仍然指向集合中的 datatable,当它随着新 datatable 的进入而下降时,它会抛出错误。但这完全是猜测,可能是完全错误的。有人可以帮忙吗?

如果您需要更多信息,我很乐意post。

您每次都需要创建一个新的 DataTable - 它们是可变的,并通过引用传递。

到目前为止,您所做的是使引用本身成为线程安全的。但这还不够——所有线程仍在共享 DataTable.

的同一个实例

相反,制作人可能看起来像这样:

 var dt = new DataTable();
 adapter.Fill(dt);

 ThreadInfo.dataTableCollection.Add(dt);

我已经删除了你周围的其他东西,因为它很可能也是错误的 - 你跨线程共享的任何资源都必须是线程安全的或同步的。最简单的方法是确保您只能在 lock 内读写它们,显式同步访问:

private static object syncObject = new object();

private static DateTime currentDate;
public static DateTime CurrentDate
{
  get { lock (syncObject) return currentDate; }
  set { lock (syncObject) currentDate = value; }
}

这些只是最基本的。您真的不想在多线程环境中四处猜测。多线程困难。至少,我建议您阅读精彩的 http://www.albahari.com/threading/ - 它会教您一些基本概念。和谦逊 :D

确保安全多线程的最简单方法是确保您从不在线程之间传递任何可变对象——当然,很难用[=来确保这一点15=]秒;一个好的准则是任何 public static 成员默认情况下都必须是线程安全的。如果你只是 运行 一个新的 Task 给定的(不可变的或非共享的)参数,并使用 return 值,你会使多线程更容易。

对于生产者-消费者队列,请确保您传递的是不可变数据或您不会重复使用的数据(例如 "new data table, fill it, pass it, forget it" 方法)。如果您需要 BlockingCollection 本身未提供的任何信号,请确保它是线程安全的。理想情况下,您希望尽可能使用高级构造 - CancellationTokenTaskManualResetEvent 等 - 请参阅上面的 link。如果你使用按值类型(一直向下 - 将引用包装在结构中显然不会帮助你),它将为你省去很多麻烦,但即使那样,你也需要锁定阅读和写作。