使用 Dispacher.BeginInvoke 从并行循环调用 UI 线程

Calling UI Thread from Parallel Loop using Dispacher.BeginInvoke

我在下面有一个简单的异步代码:这是一个带有一个按钮和一个文本框的 WPF。 我使用了一些包含五个整数的列表来模拟 5 个不同的任务。 我的意图是在 运行 并行和异步地执行所有五个任务时实现这一目标, 我可以观察到数字是一个一个地添加到文本框中的。 我做到了。方法“DoSomething”运行 并行执行所有五个任务,并且每个任务都有不同的执行时间(由 Task.Delay 模拟),因此所有结果都会使数字一一出现在文本框中。 我无法弄清楚的唯一问题是:为什么在文本框中我首先显示字符串文本“This is end text”?!如果我 await 方法 DoSomething 那么它应该首先完成,然后其余代码应该是 executed.Even 尽管在我的例子中是 GUI 的重新绘制。

我猜想这可能是由于使用了 Dispacher.BeginInvoke 可能对 async/await 机制“造成一些干扰”造成的。但我会很感激小线索以及如何避免这种行为。 我知道我可以使用 Progress 事件来实现类似的效果,但是有没有其他方法可以在 WPF 中使用并行循环并逐步更新结果,从而避免我描述的这种意外行为?

 private async void Button_Click(object sender, RoutedEventArgs e)
    {
        await DoSomething();

        tbResults.Text += "This is end text";
    }


    private async Task DoSomething()
    {
        List<int> numbers = new List<int>(Enumerable.Range(1, 5));

      await Task.Run(()=> Parallel.ForEach(numbers,async  i =>
        {
          await Task.Delay(i * 300);
          await Dispatcher.BeginInvoke(() => tbResults.Text += i.ToString() + Environment.NewLine);
        }));
    }
// output is:
//This is end text 1 2 3 4 5 (all in separatę lines).

我的问题:

  1. 为什么在方法 DoSomething 之前显示文本。
  2. 如何解决/避免它,解决它的任何替代方法(使用 Progress 事件除外)。 任何信息将不胜感激。

来自 Clemens 的良好链接见评论。回答您的问题:

  1. Parallel.ForEach 中,您 start/fire 为 numbera 的每个条目分配了一个您不等待的异步任务。所以你只需要等待,Parallel.ForEach 确实完成并且它确实在它的异步任务之前完成。
  2. 你可以做什么删除 Parallel.ForEach 内的 async 并使用 Dispatcher.Invoke 而不是 Dispatcher.BeginInvokeThread.Sleep 是一种反模式 ;),因此根据您的任务可能需要另一个解决方案(已编辑:BionicCode 有一个不错的解决方案):
private async Task DoSomething()
{
   var numbers = new List<int>(Enumerable.Range(1, 5));
   await Task.Run(()=> Parallel.ForEach(numbers, i =>
   {
      Thread.Sleep(i * 300);
      Dispatcher.Invoke(() => tbResults.Text += i.ToString() + Environment.NewLine);
   }));
}

Parallel.Foreach 的线程是“真正的”后台线程。它们被创建并且应用程序继续执行。关键是 Parallel.Foreach 不可等待,因此在使用 await.

挂起 Parallel.Foreach 的线程时继续执行
private async Task DoSomething()
{
    List<int> numbers = new List<int>(Enumerable.Range(1, 5));

    // Create the threads of Parallel.Foreach
    await Task.Run(() => 
      { 
        // Create the threads of Parallel.Foreach and continue
        Parallel.ForEach(numbers,async  i =>
        {
          // await suspends the thread and forces to return.
          // Because Parallel.ForEach is not awaitable, 
          // execution leaves the scope of the Parallel.Foreach to continue.
          await Task.Delay(i * 300);

          await Dispatcher.BeginInvoke(() => tbResults.Text += i.ToString() + Environment.NewLine);
        });

        // After the threads are created the internal await of the Parallel.Foreach suspends background threads and
        // forces to the execution to return from the Parallel.Foreach.
        // The Task.Run thread continues.

        Dispatcher.InvokeAsync(() => tbResults.Text += "Text while Parallel.Foreach threads are suspended");

        // Since the background threads of the Parallel.Foreach are not attached 
        // to the parent Task.Run, the Task.Run completes now and returns
        // i.e. Task.run does not wait for child background threads to complete.
        // ==> Leave Task.Run as there is no work. 
      });

      // Leave DoSomething() and continue to execute the remaining code in the Button_Click(). 
      // Parallel.Foreach threads is still suspended until the await chain, in this case Button_Click(), is completed.
}

解决方案是实施 Clemens 的评论建议的模式或生产者消费者模式的异步实施,例如使用 BlockingCollectionChannel 以获得对固定线程数的更多控制同时分配“无限”数量的工作。

private async Task DoSomething(int number)
{
  await Task.Delay(number * 300);
  Dispatcher.Invoke(() => tbResults.Text += number + Environment.NewLine);
}

private async void ButtonBase_OnClick(object sender, RoutedEventArgs e)
{
  List<int> numbers = new List<int>(Enumerable.Range(1, 5));
  List<Task> tasks = new List<Task>();

  // Alternatively use LINQ Select
  foreach (int number in numbers)
  {
    Task task = DoSomething(number);
    tasks.Add(task);
  }

  await Task.WhenAll(tasks);

  tbResults.Text += "This is end text" + Environment.NewLine;
}

讨论评论

"my intention was to run tasks in parallel and "report" once they are completed i.e. the taks which takes the shortest would "report" first and so on."

这正是上述解决方案中发生的情况。具有最短延迟的 Task 首先将文本附加到 TextBox

"But implementing your suggested await Task.WhenAll(tasks) causes that we need to wait for all tasks to complete and then report all at once."

要按完成顺序处理 Task 个对象,您可以将 Task.WhenAll 替换为 Task.WhenAny。如果您不仅对第一个完成的 Task 感兴趣,则必须以迭代方式使用 Task.WhenAny,直到所有 Task 个实例都已完成:

按完成顺序处理所有任务对象

private async Task DoSomething(int number)
{
  await Task.Delay(number * 300);
  Dispatcher.Invoke(() => tbResults.Text += number + Environment.NewLine);
}

private async void ButtonBase_OnClick(object sender, RoutedEventArgs e)
{
  List<int> numbers = new List<int>(Enumerable.Range(1, 5));
  List<Task> tasks = new List<Task>();

  // Alternatively use LINQ Select
  foreach (int number in numbers)
  {
    Task task = DoSomething(number);
    tasks.Add(task);
  }

  // Until all Tasks have completed
  while (tasks.Any())
  {
    Task<int> nextCompletedTask = await Task.WhenAny(tasks);

    // Remove the completed Task so that
    // we can await the next uncompleted Task that completes first
    tasks.Remove(nextCompletedTask);

    // Get the result of the completed Task
    int taskId = await nextCompletedTask;
    tbResults.Text += $"Task {taskId} has completed." + Environment.NewLine;
  }

  tbResults.Text += "This is end text" + Environment.NewLine;
}

"Parallel.ForEach is not awaitable so I thought that wrapping it up in Task.Run allows me to await it but this is because as you said "Since the background threads of the Parallel.Foreach are not attached to the parent Task.Run""

不,这不是我所说的。关键点是我回答的第三句:”关键是 Parallel.Foreach 不可等待,因此在 Parallel.Foreach 的线程是 时继续执行使用 await..
暂停
这意味着:通常 Parallel.Foreach 同步执行:调用上下文在 Parallel.Foreach 的所有线程完成后继续执行。但是由于您在这些线程中调用了 await,因此您以 async/await 的方式挂起它们。
由于 Parallel.Foreach 不可等待,因此它无法处理 await 调用并且表现得就像挂起的线程已经自然完成一样。 Parallel.Foreach不明白线程只是被await挂起,稍后会继续。换句话说,等待链被打破,因为 Parallel.Foreach 无法 return 将 Task 发送到等待的父 Task.Run 上下文以发出暂停信号。
这就是我说 Parallel.Foreach 的线程没有附加到 Task.Run 的意思。他们 运行 与 async/await 基础设施完全隔离。

"async lambdas should be "only use with events""

不,那是不正确的。当您将异步 lambda 传递给像 Action<T> 这样的 void 委托时,您是正确的:在这种情况下无法等待异步 lambda。但是,当将异步 lambda 传递给 T 类型为 TaskFunc<T> 委托时,可以等待您的 lamda:

private void NoAsyncDelegateSupportedMethod(Action lambda)
{
  // Since Action does not return a Task (return type is always void),
  // the async lambda can't be awaited
  lambda.Invoke();
}

private async Task AsyncDelegateSupportedMethod(Func<Task> asyncLambda)
{
  // Since Func returns a Task, the async lambda can be awaited
  await asyncLambda.Invoke();
}

public voi DoSoemthing()
{
  // Not a good idea as NoAsyncDelegateSupportedMethod can't handle async lamdas: it defines a void delegate
  NoAsyncDelegateSupportedMethod(async () => await Task.Delay(1));

  // A good idea as AsyncDelegateSupportedMethod can handle async lamdas: it defines a Func<Task> delegate
  AsyncDelegateSupportedMethod(async () => await Task.Delay(1));
}

如您所见,您的说法不正确。您必须始终检查被调用方法的签名及其重载。如果它接受 Func<Task> 类型的委托,你就可以开始了。
这就是向 Parallel.ForeachAsync 添加异步支持的方式:API 支持 Func<ValueTask> 类型的委托。例如 Task.Run 接受 Func<Task>,因此下面的调用完全没问题:

Task.Run(async () => await Task.Delay(1));

" I guess that you admit that .Net 6.0 brought the best solution : which is Parallel.ForEachASYNC! [...] We can spawn a couple of threads which deal with our tasks in parallel and we can await the whole loop and we do not need to wait for all tasks to complte- they "report" as they finish "

错了。 Parallel.ForeachAsync 支持使用 async/await 的线程,没错。实际上,您的原始示例将不再破坏预期的流程:因为 Parallel.ForeachAsync 在其线程中支持 await,它可以处理挂起的线程并将 Task 对象从其线程正确传播到调用者上下文,例如包装 await Task.Run.
它现在知道如何等待和恢复挂起的线程。
重要提示:Parallel.ForeachAsync still 完成 AFTER ALL 它的线程已经完成。您假设 “他们在完成后‘报告’” 是错误的。这是 foreach 最直观的并发实现。 foreach 在枚举所有项目后也完成。
在对象完成时处理 Task 对象的解决方案仍然使用上面的 Task.WhenAny 模式。

一般来说,如果您不需要 Parallel.ForeachParallel.ForeachAsync 的分区等额外功能,您可以随时使用 Task.WhenAllTask.WhenAll 尤其是 Parallel.ForeachAsync 是等效的,除了 Parallel.ForeachAsync 默认情况下提供更好的自定义:它支持节流和分区等技术,无需额外代码。