任务可靠性(或者我应该做其他事情吗?)

Task Reliability (or should I be doing something else?)

我有一个使用 Nancy 和 Nancy.Hosting.Self 的 C# 控制台应用程序。

想法是它将通过 Nancy 为 API 提供服务,主应用程序将定期轮询多个连接到各种应用程序 + 在通过 API 请求时从这些连接获取数据(通过南希)。

所以我将有 2 个 运行 进程,持续轮询和 HTTP 服务器。

我的 Program.cs 包含以下片段。

Task pollTask = null;
try {
  pollTask = Task.Run(async () => {
    while (processTask) {
      connectionPool.PollEvents();

      await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
    }
    keepRunning = false;
  }, wtoken.Token);
}
catch (AggregateException ex) {
  Console.WriteLine(ex);
}
catch (System.Threading.Tasks.TaskCanceledException ex) {
  Console.WriteLine("Task Cancelled");
  Console.WriteLine(ex);
}

后来……

using (var host = new Nancy.Hosting.Self.NancyHost(hostConfigs, new Uri(serveUrl))) {
  host.Start();
  // ...
  // routinely checking console for a keypress to quit which then sets
  // processTask to false, which would stop the polling task, which
  // in turn sets keepRunning to false which stops the application entirely.
}

轮询任务似乎 die/stop 没有任何输出到控制台以指示它停止的原因。在检查控制台输入按键时,我还查询了 pollTask.Status,它最终详细说明了 "Faulted"。但我不知道为什么。我也在质疑 long/forever 运行 任务的可靠性。

为了防止这一点含糊不清,我有一个主要问题。 Is Task suitable for a forever 运行 task in the manner in the above.如果不是,我应该使用什么来实现 2 个并行过程,其中一个是 Nancy。

更新(17/07/2018)
到目前为止,在采纳了建议和答案之后,我已经能够确定最终发生的异常并终止进程:

PollEvents process appears to be throwing an exception...
System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
   at System.Net.HttpWebRequest.BeginGetRequestStream(AsyncCallback callback, Object state)
   at System.Net.Http.HttpClientHandler.StartGettingRequestStream(RequestState state)
   at System.Net.Http.HttpClientHandler.PrepareAndStartContentUpload(RequestState state)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at ApiProxy.ServiceA.Connection.<>c__DisplayClass22_0.<<Heartbeat>b__0>d.MoveNext() in \api-proxy\src\ServiceA\Connection.cs:line 281
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at ApiProxy.ServiceA.Connection.Heartbeat() in \api-proxy\src\ServiceA\Connection.cs:line 274
   at ApiProxy.ServiceA.Connection.PollEvents(Nullable`1 sinceEventId) in \api-proxy\src\ServiceA\Connection.cs:line 313
   at ApiProxy.ConnectionPool.PollEvents() in \api-proxy\src\ConnectionPool.cs:line 50
   at ApiProxy.Program.<>c.<<Main>b__5_0>d.MoveNext() in \api-proxy\Program.cs:line 172
---> (Inner Exception #0) System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
   at System.Net.HttpWebRequest.BeginGetRequestStream(AsyncCallback callback, Object state)
   at System.Net.Http.HttpClientHandler.StartGettingRequestStream(RequestState state)
   at System.Net.Http.HttpClientHandler.PrepareAndStartContentUpload(RequestState state)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at ApiProxy.ServiceA.Connection.<>c__DisplayClass22_0.<<Heartbeat>b__0>d.MoveNext() in \api-proxy\src\ServiceA\Connection.cs:line 281<---

因为 try...catch 现在在 Task 中,这意味着虽然这个错误反复发生并且速度很快,但最终它似乎会自行纠正。然而,在要求更多 ThreadPool 之前能够查询其可用性将是理想的。 ThreadPool 问题看起来也与我的代码无关,而是与 Nancy 无关。

查找错误源后,我确定它发生在以下位置:

public bool Heartbeat() {
  if (connectionConfig.events.heartbeatUrl == "") {
    return false;
  }

  var val = false;
  var task = Task.Run(async () => {
    var heartbeatRequest = new HeartbeatRequest();
    heartbeatRequest.host = connectionConfig.host;
    heartbeatRequest.name = connectionConfig.name;
    heartbeatRequest.eventId = lastEventId;

    var prettyJson = JToken.Parse(JsonConvert.SerializeObject(heartbeatRequest)).ToString(Formatting.Indented);
    var response = await client.PostAsync(connectionConfig.events.heartbeatUrl, new StringContent(prettyJson, Encoding.UTF8, "application/json"));

    // todo: create a heartbeatResponse extending a base response type
    PingResponse heartbeatResponse = JsonConvert.DeserializeObject<PingResponse>(await response.Content.ReadAsStringAsync());
    if (heartbeatResponse != null) {
      Console.WriteLine("Heartbeat: " + heartbeatResponse.message);
      val = heartbeatResponse.success;
    }
    else {
      // todo: sentry?
    }
  });
  task.Wait();

  return val;
}

我将调用包装在 Task 中,否则我最终会得到到处都是 async 定义的海洋。 这是否是 ThreadPool 饥饿的可能来源?

更新 2
通过删除包裹 PostAsyncTask.Run 更正了上面的代码。然后调用代码将调用 Heartbeat().Wait(),因此该方法现在看起来像:

public async Task<bool> Heartbeat() {
  if (connectionConfig.events.heartbeatUrl == "") {
    return false;
  }

  var val = false;
  var heartbeatRequest = new HeartbeatRequest();
  heartbeatRequest.host = connectionConfig.host;
  heartbeatRequest.name = connectionConfig.name;
  heartbeatRequest.eventId = lastEventId;

  var prettyJson = JToken.Parse(JsonConvert.SerializeObject(heartbeatRequest)).ToString(Formatting.Indented);
  var response = await client.PostAsync(connectionConfig.events.heartbeatUrl, new StringContent(prettyJson, Encoding.UTF8, "application/json"));

  PingResponse heartbeatResponse = JsonConvert.DeserializeObject<PingResponse>(await response.Content.ReadAsStringAsync());
  if (heartbeatResponse != null) {
    Console.WriteLine("Heartbeat: " + heartbeatResponse.message);
    val = heartbeatResponse.success;
  }
  else {
    // todo: sentry?
  }

  return val;
}

希望我的一些经验对其他人有所帮助。我还不确定上述更改(有很多这样的更改)是否会防止线程饥饿。

正如评论中指出的那样,您的 try-catch 毫无用处。您需要等待任务完成才能查看是否抛出异常。

作为一个建议,为什么不使用计时器定期轮询而不是 运行 无限循环?

var pollTask = Task.Run(async () => 
 { 
    while (processTask) 
    {
      wToken.ThrowIfCancellationRequested();
      connectionPool.PollEvents();
      await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
    }
  keepRunning = false;
}, wtoken.Token);

try 
{
     pollTask.Wait(wToken);
}
catch( AggregateException ex )
{
    // Handle exception
}

或者,如果您的方法被标记为异步,您可以等待任务。 Await 将为您解包聚合异常。

try 
{
 await Task.Run(async () => 
 { 
    while (processTask) 
    {
      wToken.ThrowIfCancellationRequested();
      connectionPool.PollEvents();
      await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
    }
  keepRunning = false;
}, wtoken.Token);
}
catch( Exception ex )
{
    // Handle exception
}

为了完整起见,您还可以检查后续任务的异常 属性。

Task.Run( async ( ) =>
        {
            wToken.ThrowIfCancellationRequested( );
            connectionPool.PollEvents( );
            await Task.Delay( configLoader.config.connectionPollDelay, wToken );
        }, wToken ).ContinueWith( task =>
        {
            if( task.IsFaulted )
            {
                // Inspect the exception property of the task
                // to view the exception / exceptions.
                Console.WriteLine( task.Exception?.InnerException );
            }
        }, wToken );

当任务失败时...即内部发生异常时

pollTask = Task.Run(async () => {
        while (processTask) {
            connectionPool.PollEvents();

            await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
        }
        keepRunning = false;
    }, wtoken.Token);

异常将存储在返回的任务中。即Task.Exception。任务将被标记为错误。

除非您调用 Task.Wait, await 任务或类似任务,否则不会在您的上下文中抛出此异常。请参阅下文,了解我建议您使用的内容。


很可能 connectionPool.PollEvents(); 最终抛出一些异常,因为我上面提到的,你不会捕捉到。

如果你不能阻止异常,你可能想在任务中处理它......除非我们谈论的是一个条件,这意味着要拆除 connectionPool 或更激烈的事情。

我不知道是什么让 PollEvents 绊倒了。一种可能是您在其他地方使用了相同的对象,并且它不是线程安全的。

说到线程安全。我希望 processTaskkeepRunningvolatile。虽然,正如您将在下面看到的,您并不需要它们。


关于一个较长的运行任务,请使用此方法:

Task.Factory.StartNew(mehtod, TaskCreationOptions.LongRunning);

Factory.StartNew that takes TaskCreationOptions.

的任何重载

在内部,当您使用 TaskCreationOptions.LongRunning 创建任务时,它会将 Thread 专用于您的任务,而不是从线程池 (preventing starvation) 中窃取一个,并确保一切正常使用该设置正确。


Addendum: By StartNew is dangerous, WBuck 表示容易出错。除了启动任务之外,Task.Run 将执行错误处理(我告诉你这样做)并设置 TaskCreationOptions.DenyChildAttach.

还有一个陷阱,它不识别 async 方法(没有重载需要 Func<Task<TResult>> 并且在传递 Func<Task> 时使用 Func<TResult> 是一个陷阱时刻).

因此,直接在 StartNew 中使用异步方法不是一个好主意。当然,天真的做法是将异步方法包装在一个普通的方法中:

Task.Factory.StartNew
(
    ()=>
    {
        awyncMethod().Wait();
    },
    TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach
);

然而,这违背了目的,因为现在异步方法在线程池中,我们创建了一个线程来等待它。

当我们将 async 方法传递给 Factory.StartNew 时,会发生什么情况?我们得到一个代表您实际需要的任务创建的任务。让我们称它为 "faketask"。这个 faketask 将立即完成,您想要的任务就是结果......要正确使用它,您需要向 faketask 添加一个延续(使用适当的创建选项),以检索实际任务。此外,理想情况下,您希望此延续作为实际任务的代理(为您提供它 returns 或抛出的异常)。值得庆幸的是 TaskExtensions.Unwrap 做到了这一切。

因此,我们得出:

Task.Factory.StartNew
(
    async ()=>
    {
        /*...*/
    },
    TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach
).Unwrap();

另见 Parallel Programming with .NET - Task.Run vs Task.Factory.StartNew


如果您打算保留其中的多个,全部取自同一个 connectionPool...首先确保 connectionPool 是线程安全的。除此之外,您需要一种方法来监视任务的状态并重新启动它们。使用另一个长 运行 任务,进入所有这些的 Task.WaitAny 并处理异常 - 并进行日志记录 - 并在应用程序保持活动状态时重新启动它们。 这就是我建议您用来等待任务的方式。

另外,你可以使用CancellationToken退出循环,勾选CancellationToken.IsCancellationRequested. And for knowing that the task stopped, you can check Task.IsCompleted这就是为什么你不需要那些变量

补充:其实你可以用一个CancellationToken把它全部撕下来