Parallel.Foreach c# .net4.5.1 的多线程处理
Multi Thread Processing with Parallel.Foreach c# .net4.5.1
我有一个 IEnumerable<customClass>
对象,大约有 10-15 个条目,所以不是很多,但是当我尝试做 运行 进入 System.IO.FileNotFoundException
Parallel.Foreach(..some linq query.., object => { ...stuff....});
与枚举。这是我的代码,有时有效,有时无效:
IEnumerable<UserIdentifier> userIds = script.Entries.Select(x => x.UserIdentifier).Distinct();
await Task.Factory.StartNew(() =>
{
Parallel.ForEach(userIds, async userId =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
_Log.InfoFormat("user identifier: {0}", userId);
await Task.Factory.StartNew(() =>
{
foreach (ScriptEntry se in script.Entries.Where(x => x.UserIdentifier.Equals(userId)))
{
// // Run the script //
_Log.InfoFormat("waiting {0}", se.Delay);
Task.Delay(se.Delay);
_Log.InfoFormat("running SelectionInformation{0}", se.SelectionInformation);
ExecuteSingleEntry(se);
_Log.InfoFormat("[====== SelectionInformation {1} ELAPSED TIME: {0} ======]", watch.Elapsed,
se.SelectionInformation.Verb);
}
});
watch.Stop();
_Log.InfoFormat("[====== TOTAL ELAPSED TIME: {0} ======]", watch.Elapsed);
});
});
当函数 ExecuteSingleEntry
为 运行 时,在该函数的深处有一个函数调用了几次,它创建了一个临时目录和文件。在我看来,当我 运行 parallel.foreach
时,该函数立即被大量调用猛烈抨击(我目前正在一次测试 5 个,但需要处理大约 10 个)并且没有创建我需要的一些文件。但是,如果我在文件创建函数中遇到断点,并且每次遇到断点时都只是F5
,那么抛出文件未找到异常时我不会有任何问题。
所以,我的问题是,如何根据脚本条目中的用户 ID,延迟 运行 并行地实现 scripts.Entries
的一个子集正在启动的每个不同用户 ID 条目之间间隔 1 秒?
脚本条目如下:
UserIdentifier: 141, SelectionInformation: class of stuff, Ids: list of EntryIds, Names: list of Entry Names
并且每个用户标识符可以在数组中出现 1 次或多次。我想一次或多或少地启动所有不同的用户标识符。然后 Task
出不同的 SelectionInformation 绑定到一个脚本条目。
scripts.Entries
是ScriptEntry
的数组,如下:
[DataMember]
public TimeSpan Delay { get; set; }
[DataMember]
public SelectionInformation Selection { get; set; }
[DataMember]
public long[] Ids { get; set; }
[DataMember]
public string Names { get; set; }
[DataMember]
public long UserIdentifier { get; set; }
我参考了:Parallel.ForEach vs Task.Factory.StartNew 获得
Task.Factory.StartNew(() => Parallel.Foreach({ }) )
所以我的 UI 不会锁定我
有几个原则可以应用:
- Prefer
Task.Run
over Task.Factory.StartNew
。我在我的博客上描述了为什么 StartNew
是危险的; Run
是一种更安全、更现代的替代方案。
- 不要将
async
lambda 传递给 Parallel.ForEach
。没有意义,也不行。
Task.Delay
本身不做任何事情。您要么必须 await
要么使用同步版本 (Thread.Sleep
).
(事实上,在你的情况下,内部 StartNew
是没有意义的;它已经是并行的,并且线程池线程上的代码 - 运行 正试图在一个线程池线程并立即异步等待它???)
应用这些原则后:
await Task.Run(() =>
{
Parallel.ForEach(userIds, userId =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
_Log.InfoFormat("user identifier: {0}", userId);
foreach (ScriptEntry se in script.Entries.Where(x => x.UserIdentifier.Equals(userId)))
{
// // Run the script //
_Log.InfoFormat("waiting {0}", se.Delay);
Thread.Sleep(se.Delay);
_Log.InfoFormat("running SelectionInformation{0}", se.SelectionInformation);
ExecuteSingleEntry(se);
_Log.InfoFormat("[====== SelectionInformation {1} ELAPSED TIME: {0} ======]", watch.Elapsed,
se.SelectionInformation.Verb);
}
watch.Stop();
_Log.InfoFormat("[====== TOTAL ELAPSED TIME: {0} ======]", watch.Elapsed);
});
});
我有一个 IEnumerable<customClass>
对象,大约有 10-15 个条目,所以不是很多,但是当我尝试做 运行 进入 System.IO.FileNotFoundException
Parallel.Foreach(..some linq query.., object => { ...stuff....});
与枚举。这是我的代码,有时有效,有时无效:
IEnumerable<UserIdentifier> userIds = script.Entries.Select(x => x.UserIdentifier).Distinct();
await Task.Factory.StartNew(() =>
{
Parallel.ForEach(userIds, async userId =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
_Log.InfoFormat("user identifier: {0}", userId);
await Task.Factory.StartNew(() =>
{
foreach (ScriptEntry se in script.Entries.Where(x => x.UserIdentifier.Equals(userId)))
{
// // Run the script //
_Log.InfoFormat("waiting {0}", se.Delay);
Task.Delay(se.Delay);
_Log.InfoFormat("running SelectionInformation{0}", se.SelectionInformation);
ExecuteSingleEntry(se);
_Log.InfoFormat("[====== SelectionInformation {1} ELAPSED TIME: {0} ======]", watch.Elapsed,
se.SelectionInformation.Verb);
}
});
watch.Stop();
_Log.InfoFormat("[====== TOTAL ELAPSED TIME: {0} ======]", watch.Elapsed);
});
});
当函数 ExecuteSingleEntry
为 运行 时,在该函数的深处有一个函数调用了几次,它创建了一个临时目录和文件。在我看来,当我 运行 parallel.foreach
时,该函数立即被大量调用猛烈抨击(我目前正在一次测试 5 个,但需要处理大约 10 个)并且没有创建我需要的一些文件。但是,如果我在文件创建函数中遇到断点,并且每次遇到断点时都只是F5
,那么抛出文件未找到异常时我不会有任何问题。
所以,我的问题是,如何根据脚本条目中的用户 ID,延迟 运行 并行地实现 scripts.Entries
的一个子集正在启动的每个不同用户 ID 条目之间间隔 1 秒?
脚本条目如下:
UserIdentifier: 141, SelectionInformation: class of stuff, Ids: list of EntryIds, Names: list of Entry Names
并且每个用户标识符可以在数组中出现 1 次或多次。我想一次或多或少地启动所有不同的用户标识符。然后 Task
出不同的 SelectionInformation 绑定到一个脚本条目。
scripts.Entries
是ScriptEntry
的数组,如下:
[DataMember]
public TimeSpan Delay { get; set; }
[DataMember]
public SelectionInformation Selection { get; set; }
[DataMember]
public long[] Ids { get; set; }
[DataMember]
public string Names { get; set; }
[DataMember]
public long UserIdentifier { get; set; }
我参考了:Parallel.ForEach vs Task.Factory.StartNew 获得
Task.Factory.StartNew(() => Parallel.Foreach({ }) )
所以我的 UI 不会锁定我
有几个原则可以应用:
- Prefer
Task.Run
overTask.Factory.StartNew
。我在我的博客上描述了为什么StartNew
是危险的;Run
是一种更安全、更现代的替代方案。 - 不要将
async
lambda 传递给Parallel.ForEach
。没有意义,也不行。 Task.Delay
本身不做任何事情。您要么必须await
要么使用同步版本 (Thread.Sleep
).
(事实上,在你的情况下,内部 StartNew
是没有意义的;它已经是并行的,并且线程池线程上的代码 - 运行 正试图在一个线程池线程并立即异步等待它???)
应用这些原则后:
await Task.Run(() =>
{
Parallel.ForEach(userIds, userId =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
_Log.InfoFormat("user identifier: {0}", userId);
foreach (ScriptEntry se in script.Entries.Where(x => x.UserIdentifier.Equals(userId)))
{
// // Run the script //
_Log.InfoFormat("waiting {0}", se.Delay);
Thread.Sleep(se.Delay);
_Log.InfoFormat("running SelectionInformation{0}", se.SelectionInformation);
ExecuteSingleEntry(se);
_Log.InfoFormat("[====== SelectionInformation {1} ELAPSED TIME: {0} ======]", watch.Elapsed,
se.SelectionInformation.Verb);
}
watch.Stop();
_Log.InfoFormat("[====== TOTAL ELAPSED TIME: {0} ======]", watch.Elapsed);
});
});