使用多线程时应用程序意外关闭
Application unexpectedly closes while using multithreading
我正在尝试使用 BlockingCollection
的非常简单的生产者消费者方法,按照此处描述的文章:MS Docs 了解多线程。
我的生产者是一个单一的任务,它从一个 XML 文件(有大约 4000 个节点)中读取并将 XElement
个节点推送到一个阻塞集合。
我的消费者将有多个线程读取阻塞集合并将文件上传到基于 XElement
的站点。
这里的问题是每次我尝试 运行 程序都会意外关闭。它击中了生产者 Task.Run
但之后停止了。我不明白为什么。难道我做错了什么?它甚至没有达到 catch
块。
代码如下:
BlockingCollection<XElement> collection = new BlockingCollection<XElement>(100);
string metadataFilePath = exportLocation + listTitle + "\Metadata\" + exportJobId + ".xml";
//create the producer
Task.Run(() =>
{
//Process only the files that have not been uploaded
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
where c.Attribute("IsUploaded").Value == "No"
select c;
foreach (var item in query)
{
collection.Add(item);
}
collection.CompleteAdding();
});
//process consumer
Parallel.ForEach(collection, (new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 2 }), (metadata) => {
ProcessItems();
});
假设您正在尝试 运行 控制台应用程序,我可以想到以下问题:
- C# 中的任务默认为 后台线程,即它们无法使应用程序保持活动状态。如果作为前台线程的主线程退出,您的后台线程也将停止执行。
- 考虑到 #1,您的并行块可能在生产者线程生成任何数据之前执行,因此 程序退出导致后台生产者线程终止 还有。尝试在循环中使用 TryTake() 从集合中读取的消费者任务开始,并在程序中添加对 Console.ReadLine() 的调用,以确保控制台无法在用户不按回车键的情况下退出。如果您想并行消费,请参见示例 2 here.
您可以查看更多示例here。
尝试注意示例代码中的以下内容:
- using 块的使用using (BlockingCollection bc = new BlockingCollection())
在第一个线程中调用方法 CompleteAdding(),这表明集合将不再接受生产者添加的任何项目。一旦所有项目都被生产者线程调用,added.After 集合已标记为完成添加,不允许添加到集合中,并且当集合为空时尝试从集合中删除将不会等待.
在第二个示例中,消费者线程使用 TryTake(out result)。
消费者线程启动并尝试取出 value.Even 如果生产者线程没有添加任何项目,它将继续等待,因为生产者线程调用 CompleteAdding() 尚未将集合标记为 IsAddingCompleted。当集合已标记为 IsAddingCompleted 且集合为空时,消费者线程将从 TryTake 获得一个假 return 值,即集合的 IsCompleted 属性 变为真,允许消费者线程完成。
4.Call 到 Console.ReadLine() 这样作为后台线程的两个任务都不会在未完成的情况下终止。
希望对您有所帮助。
问题中的 是正确的。
我建议用 Microsoft TPL Dataflow 来解决您的 producer/consumer 问题:
using System.Threading.Tasks.Dataflow;
var parallelBoundedOptions = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 2,
};
var uploadItemBlock = new ActionBlock<XElement>(
item => ProcessItem(item),
parallelBoundedOptions
);
string metadataFilePath = exportLocation + listTitle + "\Metadata\" + exportJobId + ".xml";
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
where c.Attribute("IsUploaded").Value == "No"
select c;
foreach (var item in query)
{
uploadItemBlock.SendAsync(item).Wait();
}
uploadItemBlock.Complete();
uploadItemBlock.Completion.Wait();
Dataflow 使人们更容易专注于生产和消费项目,而不是如何将项目从生产者传递给消费者。
问题中的实际问题是 Parallel.Foreach
is using BlockingCollection<T>.IEnumerable<T>.GetEnumerator
instead of BlockingCollection<T>.GetConsumingEnumerable
如下所示:
static void Main()
{
var collection = new BlockingCollection<int>(100);
Task.Run(()=>
{
foreach (var element in Enumerable.Range(0, 100_000))
{
collection.Add(element);
}
collection.CompleteAdding();
});
Parallel.ForEach(
collection,
new ParallelOptions { MaxDegreeOfParallelism = 2},
i => Console.WriteLine(i));
Console.WriteLine("Done");
}
立即打印 "Done"
static void Main()
{
var collection = new BlockingCollection<int>(100);
Task.Run(()=>
{
foreach (var element in Enumerable.Range(0, 100_000))
{
collection.Add(element);
}
collection.CompleteAdding();
});
Parallel.ForEach(
collection.GetConsumingEnumerable(),
new ParallelOptions { MaxDegreeOfParallelism = 2},
i => Console.WriteLine(i));
Console.WriteLine("Done");
}
打印所有数字
我正在尝试使用 BlockingCollection
的非常简单的生产者消费者方法,按照此处描述的文章:MS Docs 了解多线程。
我的生产者是一个单一的任务,它从一个 XML 文件(有大约 4000 个节点)中读取并将 XElement
个节点推送到一个阻塞集合。
我的消费者将有多个线程读取阻塞集合并将文件上传到基于 XElement
的站点。
这里的问题是每次我尝试 运行 程序都会意外关闭。它击中了生产者 Task.Run
但之后停止了。我不明白为什么。难道我做错了什么?它甚至没有达到 catch
块。
代码如下:
BlockingCollection<XElement> collection = new BlockingCollection<XElement>(100);
string metadataFilePath = exportLocation + listTitle + "\Metadata\" + exportJobId + ".xml";
//create the producer
Task.Run(() =>
{
//Process only the files that have not been uploaded
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
where c.Attribute("IsUploaded").Value == "No"
select c;
foreach (var item in query)
{
collection.Add(item);
}
collection.CompleteAdding();
});
//process consumer
Parallel.ForEach(collection, (new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 2 }), (metadata) => {
ProcessItems();
});
假设您正在尝试 运行 控制台应用程序,我可以想到以下问题:
- C# 中的任务默认为 后台线程,即它们无法使应用程序保持活动状态。如果作为前台线程的主线程退出,您的后台线程也将停止执行。
- 考虑到 #1,您的并行块可能在生产者线程生成任何数据之前执行,因此 程序退出导致后台生产者线程终止 还有。尝试在循环中使用 TryTake() 从集合中读取的消费者任务开始,并在程序中添加对 Console.ReadLine() 的调用,以确保控制台无法在用户不按回车键的情况下退出。如果您想并行消费,请参见示例 2 here.
您可以查看更多示例here。 尝试注意示例代码中的以下内容:
- using 块的使用using (BlockingCollection bc = new BlockingCollection())
在第一个线程中调用方法 CompleteAdding(),这表明集合将不再接受生产者添加的任何项目。一旦所有项目都被生产者线程调用,added.After 集合已标记为完成添加,不允许添加到集合中,并且当集合为空时尝试从集合中删除将不会等待.
在第二个示例中,消费者线程使用 TryTake(out result)。 消费者线程启动并尝试取出 value.Even 如果生产者线程没有添加任何项目,它将继续等待,因为生产者线程调用 CompleteAdding() 尚未将集合标记为 IsAddingCompleted。当集合已标记为 IsAddingCompleted 且集合为空时,消费者线程将从 TryTake 获得一个假 return 值,即集合的 IsCompleted 属性 变为真,允许消费者线程完成。
4.Call 到 Console.ReadLine() 这样作为后台线程的两个任务都不会在未完成的情况下终止。
希望对您有所帮助。
问题中的
我建议用 Microsoft TPL Dataflow 来解决您的 producer/consumer 问题:
using System.Threading.Tasks.Dataflow;
var parallelBoundedOptions = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 2,
};
var uploadItemBlock = new ActionBlock<XElement>(
item => ProcessItem(item),
parallelBoundedOptions
);
string metadataFilePath = exportLocation + listTitle + "\Metadata\" + exportJobId + ".xml";
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
where c.Attribute("IsUploaded").Value == "No"
select c;
foreach (var item in query)
{
uploadItemBlock.SendAsync(item).Wait();
}
uploadItemBlock.Complete();
uploadItemBlock.Completion.Wait();
Dataflow 使人们更容易专注于生产和消费项目,而不是如何将项目从生产者传递给消费者。
问题中的实际问题是 Parallel.Foreach
is using BlockingCollection<T>.IEnumerable<T>.GetEnumerator
instead of BlockingCollection<T>.GetConsumingEnumerable
如下所示:
static void Main()
{
var collection = new BlockingCollection<int>(100);
Task.Run(()=>
{
foreach (var element in Enumerable.Range(0, 100_000))
{
collection.Add(element);
}
collection.CompleteAdding();
});
Parallel.ForEach(
collection,
new ParallelOptions { MaxDegreeOfParallelism = 2},
i => Console.WriteLine(i));
Console.WriteLine("Done");
}
立即打印 "Done"
static void Main()
{
var collection = new BlockingCollection<int>(100);
Task.Run(()=>
{
foreach (var element in Enumerable.Range(0, 100_000))
{
collection.Add(element);
}
collection.CompleteAdding();
});
Parallel.ForEach(
collection.GetConsumingEnumerable(),
new ParallelOptions { MaxDegreeOfParallelism = 2},
i => Console.WriteLine(i));
Console.WriteLine("Done");
}
打印所有数字