C# - 使用 ContinueWith 进行错误传播

C# - Error propagation with ContinueWith

我是线程新手,我正在尝试使用 SemaphoreSlim 让我能够同时 运行 设定数量的长任务。

我的挑战是,鉴于我编写它的方式,任何异常都没有被正确捕获。

这是我当前代码的一个非常简化的示例:

public void ThreadTest()
{
    try
    {
        var currentTasks = new List<Task>();
        SemaphoreSlim maxThread = new SemaphoreSlim(2);

        for (int i = 1; i < 5; ++i)
        {
            maxThread.Wait();

            var testTask = Faulty().ContinueWith(tsk => maxThread.Release());
            currentTasks.Add(testTask);
        }

        Task.WaitAll(currentTasks.ToArray());
        Debug.WriteLine("End - We shouldn't have gotten here");
    }
    catch (Exception ex)
    {
        Debug.WriteLine(ex.ToString());
    }
}

private async Task Faulty()
{
    throw new Exception("Never reach the awaiter");
    await Task.Factory.StartNew(() => Thread.Sleep(3000));
}

而且,不幸的是,有了 ContinueWith,我得到了 "End - We shouldn't have gotten here" 消息,而不是我想要得到的错误消息。

我如何才能将此代码正确更新为 运行?再次,如果这完全错误,我深表歉意,这是新手尝试将我在网上找到的东西放在一起的尝试 - 非常感谢任何和所有正确执行此操作的建议!!!

这与异步任务中异常的处理方式有关。

根据 Microsoft 的网站 (https://msdn.microsoft.com/en-us/magazine/jj991977.aspx):

When an exception is thrown out of an async Task or async Task method, that exception is captured and placed on the Task object

这意味着当您在异步方法中抛出异常时,它应该获取该异常并将其放置在任务对象本身上。甚至还在网站上举了一个例子,如果返回了一个Task对象,那么异常就不会在主线程抛出,因为它是放在Task对象上的。

听起来您需要检查 Task 对象以查看它是否有效或包含异常。

您可以将 ThreadTest 函数标记为异步并使用:await Faulty();在 try-catch 块内,您将能够捕获异常。

How could I update this code to run correctly?

很简单:不要使用 ContinueWith。使用 await 代替:

public void ThreadTest()
{
  try
  {
    var currentTasks = new List<Task>();
    SemaphoreSlim maxThread = new SemaphoreSlim(2);

    for (int i = 1; i < 5; ++i)
    {
      maxThread.Wait();

      var testTask = TestAsync(maxThread);
      currentTasks.Add(testTask);
    }

    Task.WaitAll(currentTasks.ToArray());
  }
  catch (Exception ex)
  {
    Debug.WriteLine(ex.ToString());
  }
}

private async Task TestAsync(SemaphoreSlim maxThread)
{
  try
  {
    await FaultyAsync();
  }
  finally
  {
    maxThread.Release();
  }
}

private async Task FaultyAsync()
{
  throw new Exception("Never reach the awaiter");
  await Task.Run(() => Thread.Sleep(3000));
}

我还做了一些其他更改:在 async naming convention, and replaced StartNew with Run since StartNew is dangerous 之后添加了一个 Async 后缀(如我在博客中所述)。


代码仍然没有完全正确。你的问题是:你想要异步并发还是并行并发?而这一切都归结为 FaultyAsync.

中的 Task.Run(() => Thread.Sleep(3000))

如果这是真正异步(例如 I/O)操作的占位符,那么 ThreadTest 应该是异步的并使用 Task.WhenAll 而不是 WaitAll,因此:

public async Task TestAsync()
{
  try
  {
    var currentTasks = new List<Task>();
    SemaphoreSlim throttle = new SemaphoreSlim(2); // Not "maxThread" since we're not dealing with threads anymore

    for (int i = 1; i < 5; ++i)
    {
      var testTask = TestAsync(throttle);
      currentTasks.Add(testTask);
    }

    await Task.WhenAll(currentTasks);
  }
  catch (Exception ex)
  {
    Debug.WriteLine(ex.ToString());
  }
}

private async Task TestAsync(SemaphoreSlim throttle)
{
  await throttle.WaitAsync();
  try
  {
    await FaultyAsync();
  }
  finally
  {
    maxThread.Release();
  }
}

private async Task FaultyAsync()
{
  throw new Exception("Never reach the awaiter");
  await Task.Delay(3000); // Naturally asynchronous operation
}

另一方面,如果 Task.Run(() => Thread.Sleep(3000)) 是真正同步(例如,CPU)操作的占位符,那么您应该使用更高级别的并行抽象,而不是创建自己的任务手:

public void ThreadTest()
{
  try
  {
    var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
    Parallel.For(1, 5, options, i => Faulty());
  }
  catch (Exception ex)
  {
    Debug.WriteLine(ex.ToString());
  }
}

private void Faulty()
{
  throw new Exception("Never reach the work");
  Thread.Sleep(3000); // Naturally synchronous operation
}