任务可靠性(或者我应该做其他事情吗?)
Task Reliability (or should I be doing something else?)
我有一个使用 Nancy 和 Nancy.Hosting.Self 的 C# 控制台应用程序。
想法是它将通过 Nancy 为 API 提供服务,主应用程序将定期轮询多个连接到各种应用程序 + 在通过 API 请求时从这些连接获取数据(通过南希)。
所以我将有 2 个 运行 进程,持续轮询和 HTTP 服务器。
我的 Program.cs 包含以下片段。
Task pollTask = null;
try {
pollTask = Task.Run(async () => {
while (processTask) {
connectionPool.PollEvents();
await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
}
keepRunning = false;
}, wtoken.Token);
}
catch (AggregateException ex) {
Console.WriteLine(ex);
}
catch (System.Threading.Tasks.TaskCanceledException ex) {
Console.WriteLine("Task Cancelled");
Console.WriteLine(ex);
}
后来……
using (var host = new Nancy.Hosting.Self.NancyHost(hostConfigs, new Uri(serveUrl))) {
host.Start();
// ...
// routinely checking console for a keypress to quit which then sets
// processTask to false, which would stop the polling task, which
// in turn sets keepRunning to false which stops the application entirely.
}
轮询任务似乎 die/stop 没有任何输出到控制台以指示它停止的原因。在检查控制台输入按键时,我还查询了 pollTask.Status,它最终详细说明了 "Faulted"。但我不知道为什么。我也在质疑 long/forever 运行 任务的可靠性。
为了防止这一点含糊不清,我有一个主要问题。 Is Task suitable for a forever 运行 task in the manner in the above.如果不是,我应该使用什么来实现 2 个并行过程,其中一个是 Nancy。
更新(17/07/2018):
到目前为止,在采纳了建议和答案之后,我已经能够确定最终发生的异常并终止进程:
PollEvents process appears to be throwing an exception...
System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
at System.Net.HttpWebRequest.BeginGetRequestStream(AsyncCallback callback, Object state)
at System.Net.Http.HttpClientHandler.StartGettingRequestStream(RequestState state)
at System.Net.Http.HttpClientHandler.PrepareAndStartContentUpload(RequestState state)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ApiProxy.ServiceA.Connection.<>c__DisplayClass22_0.<<Heartbeat>b__0>d.MoveNext() in \api-proxy\src\ServiceA\Connection.cs:line 281
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at ApiProxy.ServiceA.Connection.Heartbeat() in \api-proxy\src\ServiceA\Connection.cs:line 274
at ApiProxy.ServiceA.Connection.PollEvents(Nullable`1 sinceEventId) in \api-proxy\src\ServiceA\Connection.cs:line 313
at ApiProxy.ConnectionPool.PollEvents() in \api-proxy\src\ConnectionPool.cs:line 50
at ApiProxy.Program.<>c.<<Main>b__5_0>d.MoveNext() in \api-proxy\Program.cs:line 172
---> (Inner Exception #0) System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
at System.Net.HttpWebRequest.BeginGetRequestStream(AsyncCallback callback, Object state)
at System.Net.Http.HttpClientHandler.StartGettingRequestStream(RequestState state)
at System.Net.Http.HttpClientHandler.PrepareAndStartContentUpload(RequestState state)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ApiProxy.ServiceA.Connection.<>c__DisplayClass22_0.<<Heartbeat>b__0>d.MoveNext() in \api-proxy\src\ServiceA\Connection.cs:line 281<---
因为 try...catch
现在在 Task 中,这意味着虽然这个错误反复发生并且速度很快,但最终它似乎会自行纠正。然而,在要求更多 ThreadPool 之前能够查询其可用性将是理想的。 ThreadPool 问题看起来也与我的代码无关,而是与 Nancy 无关。
查找错误源后,我确定它发生在以下位置:
public bool Heartbeat() {
if (connectionConfig.events.heartbeatUrl == "") {
return false;
}
var val = false;
var task = Task.Run(async () => {
var heartbeatRequest = new HeartbeatRequest();
heartbeatRequest.host = connectionConfig.host;
heartbeatRequest.name = connectionConfig.name;
heartbeatRequest.eventId = lastEventId;
var prettyJson = JToken.Parse(JsonConvert.SerializeObject(heartbeatRequest)).ToString(Formatting.Indented);
var response = await client.PostAsync(connectionConfig.events.heartbeatUrl, new StringContent(prettyJson, Encoding.UTF8, "application/json"));
// todo: create a heartbeatResponse extending a base response type
PingResponse heartbeatResponse = JsonConvert.DeserializeObject<PingResponse>(await response.Content.ReadAsStringAsync());
if (heartbeatResponse != null) {
Console.WriteLine("Heartbeat: " + heartbeatResponse.message);
val = heartbeatResponse.success;
}
else {
// todo: sentry?
}
});
task.Wait();
return val;
}
我将调用包装在 Task
中,否则我最终会得到到处都是 async
定义的海洋。 这是否是 ThreadPool 饥饿的可能来源?
更新 2
通过删除包裹 PostAsync
的 Task.Run
更正了上面的代码。然后调用代码将调用 Heartbeat().Wait()
,因此该方法现在看起来像:
public async Task<bool> Heartbeat() {
if (connectionConfig.events.heartbeatUrl == "") {
return false;
}
var val = false;
var heartbeatRequest = new HeartbeatRequest();
heartbeatRequest.host = connectionConfig.host;
heartbeatRequest.name = connectionConfig.name;
heartbeatRequest.eventId = lastEventId;
var prettyJson = JToken.Parse(JsonConvert.SerializeObject(heartbeatRequest)).ToString(Formatting.Indented);
var response = await client.PostAsync(connectionConfig.events.heartbeatUrl, new StringContent(prettyJson, Encoding.UTF8, "application/json"));
PingResponse heartbeatResponse = JsonConvert.DeserializeObject<PingResponse>(await response.Content.ReadAsStringAsync());
if (heartbeatResponse != null) {
Console.WriteLine("Heartbeat: " + heartbeatResponse.message);
val = heartbeatResponse.success;
}
else {
// todo: sentry?
}
return val;
}
希望我的一些经验对其他人有所帮助。我还不确定上述更改(有很多这样的更改)是否会防止线程饥饿。
正如评论中指出的那样,您的 try-catch
毫无用处。您需要等待任务完成才能查看是否抛出异常。
作为一个建议,为什么不使用计时器定期轮询而不是 运行 无限循环?
var pollTask = Task.Run(async () =>
{
while (processTask)
{
wToken.ThrowIfCancellationRequested();
connectionPool.PollEvents();
await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
}
keepRunning = false;
}, wtoken.Token);
try
{
pollTask.Wait(wToken);
}
catch( AggregateException ex )
{
// Handle exception
}
或者,如果您的方法被标记为异步,您可以等待任务。 Await 将为您解包聚合异常。
try
{
await Task.Run(async () =>
{
while (processTask)
{
wToken.ThrowIfCancellationRequested();
connectionPool.PollEvents();
await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
}
keepRunning = false;
}, wtoken.Token);
}
catch( Exception ex )
{
// Handle exception
}
为了完整起见,您还可以检查后续任务的异常 属性。
Task.Run( async ( ) =>
{
wToken.ThrowIfCancellationRequested( );
connectionPool.PollEvents( );
await Task.Delay( configLoader.config.connectionPollDelay, wToken );
}, wToken ).ContinueWith( task =>
{
if( task.IsFaulted )
{
// Inspect the exception property of the task
// to view the exception / exceptions.
Console.WriteLine( task.Exception?.InnerException );
}
}, wToken );
当任务失败时...即内部发生异常时
pollTask = Task.Run(async () => {
while (processTask) {
connectionPool.PollEvents();
await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
}
keepRunning = false;
}, wtoken.Token);
异常将存储在返回的任务中。即Task.Exception
。任务将被标记为错误。
除非您调用 Task.Wait
, await
任务或类似任务,否则不会在您的上下文中抛出此异常。请参阅下文,了解我建议您使用的内容。
很可能 connectionPool.PollEvents();
最终抛出一些异常,因为我上面提到的,你不会捕捉到。
如果你不能阻止异常,你可能想在任务中处理它......除非我们谈论的是一个条件,这意味着要拆除 connectionPool
或更激烈的事情。
我不知道是什么让 PollEvents
绊倒了。一种可能是您在其他地方使用了相同的对象,并且它不是线程安全的。
说到线程安全。我希望 processTask
和 keepRunning
是 volatile。虽然,正如您将在下面看到的,您并不需要它们。
关于一个较长的运行任务,请使用此方法:
Task.Factory.StartNew(mehtod, TaskCreationOptions.LongRunning);
或 Factory.StartNew
that takes TaskCreationOptions
.
的任何重载
在内部,当您使用 TaskCreationOptions.LongRunning
创建任务时,它会将 Thread
专用于您的任务,而不是从线程池 (preventing starvation) 中窃取一个,并确保一切正常使用该设置正确。
Addendum: By StartNew is dangerous, WBuck 表示容易出错。除了启动任务之外,Task.Run
将执行错误处理(我告诉你这样做)并设置 TaskCreationOptions.DenyChildAttach
.
还有一个陷阱,它不识别 async
方法(没有重载需要 Func<Task<TResult>>
并且在传递 Func<Task>
时使用 Func<TResult>
是一个陷阱时刻).
因此,直接在 StartNew
中使用异步方法不是一个好主意。当然,天真的做法是将异步方法包装在一个普通的方法中:
Task.Factory.StartNew
(
()=>
{
awyncMethod().Wait();
},
TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach
);
然而,这违背了目的,因为现在异步方法在线程池中,我们创建了一个线程来等待它。
当我们将 async
方法传递给 Factory.StartNew
时,会发生什么情况?我们得到一个代表您实际需要的任务创建的任务。让我们称它为 "faketask"。这个 faketask 将立即完成,您想要的任务就是结果......要正确使用它,您需要向 faketask 添加一个延续(使用适当的创建选项),以检索实际任务。此外,理想情况下,您希望此延续作为实际任务的代理(为您提供它 returns 或抛出的异常)。值得庆幸的是 TaskExtensions.Unwrap
做到了这一切。
因此,我们得出:
Task.Factory.StartNew
(
async ()=>
{
/*...*/
},
TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach
).Unwrap();
另见 Parallel Programming with .NET - Task.Run vs Task.Factory.StartNew。
如果您打算保留其中的多个,全部取自同一个 connectionPool
...首先确保 connectionPool
是线程安全的。除此之外,您需要一种方法来监视任务的状态并重新启动它们。使用另一个长 运行 任务,进入所有这些的 Task.WaitAny
并处理异常 - 并进行日志记录 - 并在应用程序保持活动状态时重新启动它们。 这就是我建议您用来等待任务的方式。
另外,你可以使用CancellationToken
退出循环,勾选CancellationToken.IsCancellationRequested
. And for knowing that the task stopped, you can check Task.IsCompleted
。 这就是为什么你不需要那些变量。
补充:其实你可以用一个CancellationToken
把它全部撕下来
我有一个使用 Nancy 和 Nancy.Hosting.Self 的 C# 控制台应用程序。
想法是它将通过 Nancy 为 API 提供服务,主应用程序将定期轮询多个连接到各种应用程序 + 在通过 API 请求时从这些连接获取数据(通过南希)。
所以我将有 2 个 运行 进程,持续轮询和 HTTP 服务器。
我的 Program.cs 包含以下片段。
Task pollTask = null;
try {
pollTask = Task.Run(async () => {
while (processTask) {
connectionPool.PollEvents();
await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
}
keepRunning = false;
}, wtoken.Token);
}
catch (AggregateException ex) {
Console.WriteLine(ex);
}
catch (System.Threading.Tasks.TaskCanceledException ex) {
Console.WriteLine("Task Cancelled");
Console.WriteLine(ex);
}
后来……
using (var host = new Nancy.Hosting.Self.NancyHost(hostConfigs, new Uri(serveUrl))) {
host.Start();
// ...
// routinely checking console for a keypress to quit which then sets
// processTask to false, which would stop the polling task, which
// in turn sets keepRunning to false which stops the application entirely.
}
轮询任务似乎 die/stop 没有任何输出到控制台以指示它停止的原因。在检查控制台输入按键时,我还查询了 pollTask.Status,它最终详细说明了 "Faulted"。但我不知道为什么。我也在质疑 long/forever 运行 任务的可靠性。
为了防止这一点含糊不清,我有一个主要问题。 Is Task suitable for a forever 运行 task in the manner in the above.如果不是,我应该使用什么来实现 2 个并行过程,其中一个是 Nancy。
更新(17/07/2018):
到目前为止,在采纳了建议和答案之后,我已经能够确定最终发生的异常并终止进程:
PollEvents process appears to be throwing an exception...
System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
at System.Net.HttpWebRequest.BeginGetRequestStream(AsyncCallback callback, Object state)
at System.Net.Http.HttpClientHandler.StartGettingRequestStream(RequestState state)
at System.Net.Http.HttpClientHandler.PrepareAndStartContentUpload(RequestState state)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ApiProxy.ServiceA.Connection.<>c__DisplayClass22_0.<<Heartbeat>b__0>d.MoveNext() in \api-proxy\src\ServiceA\Connection.cs:line 281
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at ApiProxy.ServiceA.Connection.Heartbeat() in \api-proxy\src\ServiceA\Connection.cs:line 274
at ApiProxy.ServiceA.Connection.PollEvents(Nullable`1 sinceEventId) in \api-proxy\src\ServiceA\Connection.cs:line 313
at ApiProxy.ConnectionPool.PollEvents() in \api-proxy\src\ConnectionPool.cs:line 50
at ApiProxy.Program.<>c.<<Main>b__5_0>d.MoveNext() in \api-proxy\Program.cs:line 172
---> (Inner Exception #0) System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
at System.Net.HttpWebRequest.BeginGetRequestStream(AsyncCallback callback, Object state)
at System.Net.Http.HttpClientHandler.StartGettingRequestStream(RequestState state)
at System.Net.Http.HttpClientHandler.PrepareAndStartContentUpload(RequestState state)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ApiProxy.ServiceA.Connection.<>c__DisplayClass22_0.<<Heartbeat>b__0>d.MoveNext() in \api-proxy\src\ServiceA\Connection.cs:line 281<---
因为 try...catch
现在在 Task 中,这意味着虽然这个错误反复发生并且速度很快,但最终它似乎会自行纠正。然而,在要求更多 ThreadPool 之前能够查询其可用性将是理想的。 ThreadPool 问题看起来也与我的代码无关,而是与 Nancy 无关。
查找错误源后,我确定它发生在以下位置:
public bool Heartbeat() {
if (connectionConfig.events.heartbeatUrl == "") {
return false;
}
var val = false;
var task = Task.Run(async () => {
var heartbeatRequest = new HeartbeatRequest();
heartbeatRequest.host = connectionConfig.host;
heartbeatRequest.name = connectionConfig.name;
heartbeatRequest.eventId = lastEventId;
var prettyJson = JToken.Parse(JsonConvert.SerializeObject(heartbeatRequest)).ToString(Formatting.Indented);
var response = await client.PostAsync(connectionConfig.events.heartbeatUrl, new StringContent(prettyJson, Encoding.UTF8, "application/json"));
// todo: create a heartbeatResponse extending a base response type
PingResponse heartbeatResponse = JsonConvert.DeserializeObject<PingResponse>(await response.Content.ReadAsStringAsync());
if (heartbeatResponse != null) {
Console.WriteLine("Heartbeat: " + heartbeatResponse.message);
val = heartbeatResponse.success;
}
else {
// todo: sentry?
}
});
task.Wait();
return val;
}
我将调用包装在 Task
中,否则我最终会得到到处都是 async
定义的海洋。 这是否是 ThreadPool 饥饿的可能来源?
更新 2
通过删除包裹 PostAsync
的 Task.Run
更正了上面的代码。然后调用代码将调用 Heartbeat().Wait()
,因此该方法现在看起来像:
public async Task<bool> Heartbeat() {
if (connectionConfig.events.heartbeatUrl == "") {
return false;
}
var val = false;
var heartbeatRequest = new HeartbeatRequest();
heartbeatRequest.host = connectionConfig.host;
heartbeatRequest.name = connectionConfig.name;
heartbeatRequest.eventId = lastEventId;
var prettyJson = JToken.Parse(JsonConvert.SerializeObject(heartbeatRequest)).ToString(Formatting.Indented);
var response = await client.PostAsync(connectionConfig.events.heartbeatUrl, new StringContent(prettyJson, Encoding.UTF8, "application/json"));
PingResponse heartbeatResponse = JsonConvert.DeserializeObject<PingResponse>(await response.Content.ReadAsStringAsync());
if (heartbeatResponse != null) {
Console.WriteLine("Heartbeat: " + heartbeatResponse.message);
val = heartbeatResponse.success;
}
else {
// todo: sentry?
}
return val;
}
希望我的一些经验对其他人有所帮助。我还不确定上述更改(有很多这样的更改)是否会防止线程饥饿。
正如评论中指出的那样,您的 try-catch
毫无用处。您需要等待任务完成才能查看是否抛出异常。
作为一个建议,为什么不使用计时器定期轮询而不是 运行 无限循环?
var pollTask = Task.Run(async () =>
{
while (processTask)
{
wToken.ThrowIfCancellationRequested();
connectionPool.PollEvents();
await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
}
keepRunning = false;
}, wtoken.Token);
try
{
pollTask.Wait(wToken);
}
catch( AggregateException ex )
{
// Handle exception
}
或者,如果您的方法被标记为异步,您可以等待任务。 Await 将为您解包聚合异常。
try
{
await Task.Run(async () =>
{
while (processTask)
{
wToken.ThrowIfCancellationRequested();
connectionPool.PollEvents();
await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
}
keepRunning = false;
}, wtoken.Token);
}
catch( Exception ex )
{
// Handle exception
}
为了完整起见,您还可以检查后续任务的异常 属性。
Task.Run( async ( ) =>
{
wToken.ThrowIfCancellationRequested( );
connectionPool.PollEvents( );
await Task.Delay( configLoader.config.connectionPollDelay, wToken );
}, wToken ).ContinueWith( task =>
{
if( task.IsFaulted )
{
// Inspect the exception property of the task
// to view the exception / exceptions.
Console.WriteLine( task.Exception?.InnerException );
}
}, wToken );
当任务失败时...即内部发生异常时
pollTask = Task.Run(async () => {
while (processTask) {
connectionPool.PollEvents();
await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
}
keepRunning = false;
}, wtoken.Token);
异常将存储在返回的任务中。即Task.Exception
。任务将被标记为错误。
除非您调用 Task.Wait
, await
任务或类似任务,否则不会在您的上下文中抛出此异常。请参阅下文,了解我建议您使用的内容。
很可能 connectionPool.PollEvents();
最终抛出一些异常,因为我上面提到的,你不会捕捉到。
如果你不能阻止异常,你可能想在任务中处理它......除非我们谈论的是一个条件,这意味着要拆除 connectionPool
或更激烈的事情。
我不知道是什么让 PollEvents
绊倒了。一种可能是您在其他地方使用了相同的对象,并且它不是线程安全的。
说到线程安全。我希望 processTask
和 keepRunning
是 volatile。虽然,正如您将在下面看到的,您并不需要它们。
关于一个较长的运行任务,请使用此方法:
Task.Factory.StartNew(mehtod, TaskCreationOptions.LongRunning);
或 Factory.StartNew
that takes TaskCreationOptions
.
在内部,当您使用 TaskCreationOptions.LongRunning
创建任务时,它会将 Thread
专用于您的任务,而不是从线程池 (preventing starvation) 中窃取一个,并确保一切正常使用该设置正确。
Addendum: By StartNew is dangerous, WBuck 表示容易出错。除了启动任务之外,Task.Run
将执行错误处理(我告诉你这样做)并设置 TaskCreationOptions.DenyChildAttach
.
还有一个陷阱,它不识别 async
方法(没有重载需要 Func<Task<TResult>>
并且在传递 Func<Task>
时使用 Func<TResult>
是一个陷阱时刻).
因此,直接在 StartNew
中使用异步方法不是一个好主意。当然,天真的做法是将异步方法包装在一个普通的方法中:
Task.Factory.StartNew
(
()=>
{
awyncMethod().Wait();
},
TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach
);
然而,这违背了目的,因为现在异步方法在线程池中,我们创建了一个线程来等待它。
当我们将 async
方法传递给 Factory.StartNew
时,会发生什么情况?我们得到一个代表您实际需要的任务创建的任务。让我们称它为 "faketask"。这个 faketask 将立即完成,您想要的任务就是结果......要正确使用它,您需要向 faketask 添加一个延续(使用适当的创建选项),以检索实际任务。此外,理想情况下,您希望此延续作为实际任务的代理(为您提供它 returns 或抛出的异常)。值得庆幸的是 TaskExtensions.Unwrap
做到了这一切。
因此,我们得出:
Task.Factory.StartNew
(
async ()=>
{
/*...*/
},
TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach
).Unwrap();
另见 Parallel Programming with .NET - Task.Run vs Task.Factory.StartNew。
如果您打算保留其中的多个,全部取自同一个 connectionPool
...首先确保 connectionPool
是线程安全的。除此之外,您需要一种方法来监视任务的状态并重新启动它们。使用另一个长 运行 任务,进入所有这些的 Task.WaitAny
并处理异常 - 并进行日志记录 - 并在应用程序保持活动状态时重新启动它们。 这就是我建议您用来等待任务的方式。
另外,你可以使用CancellationToken
退出循环,勾选CancellationToken.IsCancellationRequested
. And for knowing that the task stopped, you can check Task.IsCompleted
。 这就是为什么你不需要那些变量。
补充:其实你可以用一个CancellationToken
把它全部撕下来