Parallel.Foreach c# .net4.5.1 的多线程处理

Multi Thread Processing with Parallel.Foreach c# .net4.5.1

我有一个 IEnumerable<customClass> 对象,大约有 10-15 个条目,所以不是很多,但是当我尝试做 运行 进入 System.IO.FileNotFoundException

Parallel.Foreach(..some linq query.., object => { ...stuff....});

与枚举。这是我的代码,有时有效,有时无效:

        IEnumerable<UserIdentifier> userIds = script.Entries.Select(x => x.UserIdentifier).Distinct();

        await Task.Factory.StartNew(() =>
        {
            Parallel.ForEach(userIds, async userId =>
            {
                Stopwatch watch = new Stopwatch();
                watch.Start();

                _Log.InfoFormat("user identifier: {0}", userId);
                await Task.Factory.StartNew(() =>
                {
                    foreach (ScriptEntry se in script.Entries.Where(x => x.UserIdentifier.Equals(userId)))
                    {
                        //    // Run the script //
                        _Log.InfoFormat("waiting {0}", se.Delay);
                        Task.Delay(se.Delay);
                        _Log.InfoFormat("running SelectionInformation{0}", se.SelectionInformation);

                        ExecuteSingleEntry(se);
                        _Log.InfoFormat("[====== SelectionInformation {1} ELAPSED TIME: {0} ======]", watch.Elapsed,
                            se.SelectionInformation.Verb);
                    }

                });

                watch.Stop();
                _Log.InfoFormat("[====== TOTAL ELAPSED TIME: {0} ======]", watch.Elapsed);
            });
        });

当函数 ExecuteSingleEntry 为 运行 时,在该函数的深处有一个函数调用了几次,它创建了一个临时目录和文件。在我看来,当我 运行 parallel.foreach 时,该函数立即被大量调用猛烈抨击(我目前正在一次测试 5 个,但需要处理大约 10 个)并且没有创建我需要的一些文件。但是,如果我在文件创建函数中遇到断点,并且每次遇到断点时都只是F5,那么抛出文件未找到异常时我不会有任何问题。

所以,我的问题是,如何根据脚本条目中的用户 ID,延迟 运行 并行地实现 scripts.Entries 的一个子集正在启动的每个不同用户 ID 条目之间间隔 1 秒?

脚本条目如下:

UserIdentifier: 141, SelectionInformation: class of stuff, Ids: list of EntryIds, Names: list of Entry Names

并且每个用户标识符可以在数组中出现 1 次或多次。我想一次或多或少地启动所有不同的用户标识符。然后 Task 出不同的 SelectionInformation 绑定到一个脚本条目。

scripts.EntriesScriptEntry的数组,如下:

    [DataMember]
    public TimeSpan Delay { get; set; }

    [DataMember]
    public SelectionInformation Selection { get; set; }

    [DataMember]
    public long[] Ids { get; set; }

    [DataMember]
    public string Names { get; set; }

    [DataMember]
    public long UserIdentifier { get; set; }

我参考了:Parallel.ForEach vs Task.Factory.StartNew 获得

Task.Factory.StartNew(() => Parallel.Foreach({ }) ) 所以我的 UI 不会锁定我

有几个原则可以应用:

  1. Prefer Task.Run over Task.Factory.StartNew。我在我的博客上描述了为什么 StartNew 是危险的; Run 是一种更安全、更现代的替代方案。
  2. 不要将 async lambda 传递给 Parallel.ForEach。没有意义,也不行。
  3. Task.Delay 本身不做任何事情。您要么必须 await 要么使用同步版本 (Thread.Sleep).

(事实上,在你的情况下,内部 StartNew 是没有意义的;它已经是并行的,并且线程池线程上的代码 - 运行 正试图在一个线程池线程并立即异步等待它???)

应用这些原则后:

await Task.Run(() =>
{
  Parallel.ForEach(userIds, userId =>
  {
    Stopwatch watch = new Stopwatch();
    watch.Start();

    _Log.InfoFormat("user identifier: {0}", userId);
    foreach (ScriptEntry se in script.Entries.Where(x => x.UserIdentifier.Equals(userId)))
    {
      //    // Run the script //
      _Log.InfoFormat("waiting {0}", se.Delay);
      Thread.Sleep(se.Delay);
      _Log.InfoFormat("running SelectionInformation{0}", se.SelectionInformation);

      ExecuteSingleEntry(se);
      _Log.InfoFormat("[====== SelectionInformation {1} ELAPSED TIME: {0} ======]", watch.Elapsed,
          se.SelectionInformation.Verb);
    }

    watch.Stop();
    _Log.InfoFormat("[====== TOTAL ELAPSED TIME: {0} ======]", watch.Elapsed);
  });
});