使用多线程时应用程序意外关闭

Application unexpectedly closes while using multithreading

我正在尝试使用 BlockingCollection 的非常简单的生产者消费者方法,按照此处描述的文章:MS Docs 了解多线程。

我的生产者是一个单一的任务,它从一个 XML 文件(有大约 4000 个节点)中读取并将 XElement 个节点推送到一个阻塞集合。

我的消费者将有多个线程读取阻塞集合并将文件上传到基于 XElement 的站点。

这里的问题是每次我尝试 运行 程序都会意外关闭。它击中了生产者 Task.Run 但之后停止了。我不明白为什么。难道我做错了什么?它甚至没有达到 catch 块。

代码如下:

            BlockingCollection<XElement> collection = new BlockingCollection<XElement>(100);                
            string metadataFilePath = exportLocation + listTitle + "\Metadata\" + exportJobId + ".xml";
            //create the producer
            Task.Run(() =>
            {                    
                //Process only the files that have not been uploaded                                   
                XDocument xmlFile = XDocument.Load(metadataFilePath);
                var query = from c in xmlFile.Elements("Items").Elements("Item")
                            where c.Attribute("IsUploaded").Value == "No"
                            select c;
                foreach (var item in query)
                {
                    collection.Add(item);
                }
                collection.CompleteAdding();
            });

            //process consumer
            Parallel.ForEach(collection, (new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 2 }), (metadata) => {
                ProcessItems();
            });

假设您正在尝试 运行 控制台应用程序,我可以想到以下问题:

  1. C# 中的任务默认为 后台线程,即它们无法使应用程序保持活动状态。如果作为前台线程的主线程退出,您的后台线程也将停止执行。
  2. 考虑到 #1,您的并行块可能在生产者线程生成任何数据之前执行,因此 程序退出导致后台生产者线程终止 还有。尝试在循环中使用 TryTake() 从集合中读取的消费者任务开始,并在程序中添加对 Console.ReadLine() 的调用,以确保控制台无法在用户不按回车键的情况下退出。如果您想并行消费,请参见示例 2 here.

您可以查看更多示例here。 尝试注意示例代码中的以下内容:

  1. using 块的使用using (BlockingCollection bc = new BlockingCollection())
  2. 在第一个线程中调用方法 CompleteAdding(),这表明集合将不再接受生产者添加的任何项目。一旦所有项目都被生产者线程调用,added.After 集合已标记为完成添加,不允许添加到集合中,并且当集合为空时尝试从集合中删除将不会等待.

  3. 在第二个示例中,消费者线程使用 TryTake(out result)。 消费者线程启动并尝试取出 value.Even 如果生产者线程没有添加任何项目,它将继续等待,因为生产者线程调用 CompleteAdding() 尚未将集合标记为 IsAddingCompleted。当集合已标记为 IsAddingCompleted 且集合为空时,消费者线程将从 TryTake 获得一个假 return 值,即集合的 IsCompleted 属性 变为真,允许消费者线程完成。

4.Call 到 Console.ReadLine() 这样作为后台线程的两个任务都不会在未完成的情况下终止。

希望对您有所帮助。

问题中的 是正确的。

我建议用 Microsoft TPL Dataflow 来解决您的 producer/consumer 问题:

using System.Threading.Tasks.Dataflow;

var parallelBoundedOptions = new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 100,
    MaxDegreeOfParallelism = 2,
};
var uploadItemBlock = new ActionBlock<XElement>(
    item => ProcessItem(item),
    parallelBoundedOptions
);
string metadataFilePath = exportLocation + listTitle + "\Metadata\" + exportJobId + ".xml";
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
            where c.Attribute("IsUploaded").Value == "No"
            select c;
foreach (var item in query)
{
    uploadItemBlock.SendAsync(item).Wait();
}
uploadItemBlock.Complete();
uploadItemBlock.Completion.Wait();

Dataflow 使人们更容易专注于生产和消费项目,而不是如何将项目从生产者传递给消费者。

问题中的实际问题是 Parallel.Foreach is using BlockingCollection<T>.IEnumerable<T>.GetEnumerator instead of BlockingCollection<T>.GetConsumingEnumerable 如下所示:

static void Main()
{
    var collection = new BlockingCollection<int>(100);
    Task.Run(()=>
    {
        foreach (var element in Enumerable.Range(0, 100_000))
        {
            collection.Add(element);
        }
        collection.CompleteAdding();
    });

    Parallel.ForEach(
        collection, 
        new ParallelOptions { MaxDegreeOfParallelism = 2},
        i => Console.WriteLine(i));

    Console.WriteLine("Done");
}

立即打印 "Done"

static void Main()
{
    var collection = new BlockingCollection<int>(100);
    Task.Run(()=>
    {
        foreach (var element in Enumerable.Range(0, 100_000))
        {
            collection.Add(element);
        }
        collection.CompleteAdding();
    });

    Parallel.ForEach(
        collection.GetConsumingEnumerable(), 
        new ParallelOptions { MaxDegreeOfParallelism = 2},
        i => Console.WriteLine(i));

    Console.WriteLine("Done");
}

打印所有数字