Windows 通过触摸基于任务的异步模式 (TAP) 来监控服务作业
Windows Service Job Monitor via Touch of Task-based Asynchronous Pattern (TAP)
我正在用 C# 构建一个 windows 服务,它将监视一个作业队列,当它在队列中找到可用的项目时,它会启动将 'completely' 处理该作业的作业(包括失败)。我正在使用 Task.Factory.StartNew() 并且诚然,我在 TAP 方面非常新手(完成此 post 后前往阅读博客)。
要求
定期轮询数据库队列以查找可用的作业。 (为了这个问题的目的,让我们忽略消息队列与数据库队列的争论)
异步启动作业,以便轮询可以继续监视队列并启动新作业。
遵守 'job' 门槛,这样就不会产生太多的工作。
'Delay' 正在处理作业时关闭服务。
确保作业中的失败记录到事件日志中并且不会使 Windows 服务崩溃。
我的代码在下面,但这是我的 questions/concerns(如果有更好的地方 post 这个,请告诉我)但这主要围绕正确使用 TAP 和 'stability' 的。请注意,我的大部分问题也都记录在代码中。
问题
在 PollJobs 中,我使用 Task.Factory.Start/ContinueWith 的方式是否适合使用此作业处理服务来保持高吞吐量?我永远不会阻塞任何线程,并希望为我目前拥有的一小部分 TAP 使用正确的模式。
ConcurrentDictionary - 使用它来监视当前 运行 个作业,并且每个作业在完成时将其自身从字典中删除(在我假设来自 Task.Factory.StartNew 的单独线程上) ,因为它是 ConcurrentDictionary,我假设我在使用它时不需要任何锁?
作业异常(最严重的异常是 OutOfMemoryException)- 作业处理期间的任何异常 不能 关闭服务并且必须正确记录在事件日志和数据库队列中.不幸的是,目前有些作业可能会抛出 OutOfMemoryException。 'job processing' 中的 try/catch 是否足以捕获和处理所有情况,从而使 Windows 服务永远不会意外终止?还是 better/safer 为每个作业启动一个 AppDomain 以实现更多隔离? (超杀?)
我看到过关于 'proper' 定时器使用的争论,但没有像样的答案。对我的 System.Threading.Timer 的设置和使用有什么意见吗? (特别是围绕我如何确保在上一次调用完成之前不再调用 PollJobs)
如果您已经做到了这一点。提前致谢。
代码
public partial class EvolutionService : ServiceBase
{
EventLog eventLog = new EventLog() {
Source = "BTREvolution",
Log = "Application"
};
Timer timer;
int pollingSeconds = 1;
// Dictionary of currently running jobs so I can query current workload.
// Since ConcurrentDictionary, hoping I don't need any lock() { } code.
private ConcurrentDictionary<Guid, RunningJob> runningJobs =
new ConcurrentDictionary<Guid, RunningJob>();
public EvolutionService( string[] args )
{
InitializeComponent();
if ( !EventLog.SourceExists( eventLog.Source ) )
{
EventLog.CreateEventSource(
eventLog.Source,
eventLog.Log );
}
}
protected override void OnStart( string[] args )
{
// Only run polling code one time and the PollJobs will
// initiate next poll interval so that PollJobs is never
// called again before it finishes its processing,
timer = new System.Threading.Timer(
PollJobs, null,
TimeSpan.FromSeconds( 5 ).Milliseconds,
Timeout.Infinite );
}
protected override void OnPause()
{
// Disable the timer polling so no more jobs are processed
timer = null;
// Don't allow pause if any jobs are running
if ( runningJobs.Count > 0 )
{
var searcher = new System.Management.ManagementObjectSearcher(
"SELECT UserName FROM Win32_ComputerSystem" );
var collection = searcher.Get();
var username =
(string)collection
.Cast<System.Management.ManagementBaseObject>()
.First()[ "UserName" ];
throw new InvalidOperationException( $"{username} requested pause. The service will not process incoming jobs, but it must finish the {runningJobs.Count} job(s) are running before it can be paused." );
}
base.OnPause();
}
protected override void OnContinue()
{
// Tell time to start polling one time in 5 seconds
timer = new System.Threading.Timer(
PollJobs, null,
TimeSpan.FromSeconds( 5 ).Milliseconds,
Timeout.Infinite );
base.OnContinue();
}
protected override void OnStop()
{
// Disable the timer polling so no more jobs are processed
timer = null;
// Until all jobs successfully cancel, keep requesting more time
//
var task = Task.Run( () =>
{
// If any running jobs, send the Cancel notification
if ( runningJobs.Count > 0 )
{
foreach ( var job in runningJobs )
{
eventLog.WriteEntry(
$"Cancelling job {job.Value.Key}" );
job.Value.CancellationSource.Cancel();
}
}
// When a job cancels (and thus completes) it'll
// be removed from the runningJobs workload monitor.
// While any jobs are running, just wait another second
while ( runningJobs.Count > 0 )
{
Task.Delay( TimeSpan.FromSeconds( 1 ) ).Wait();
}
} );
// While the task is not finished, every 5 seconds
// I'll request an additional 5 seconds
while ( !task.Wait( TimeSpan.FromSeconds( 5 ) ) )
{
RequestAdditionalTime(
TimeSpan.FromSeconds( 5 ).Milliseconds );
}
}
public void PollJobs( object state )
{
// If no jobs processed, then poll at regular interval
var timerDue =
TimeSpan.FromSeconds( pollingSeconds ).Milliseconds;
try
{
// Could define some sort of threshhold here so it
// doesn't get too bogged down, might have to check
// Jobs by type to know whether 'batch' or 'single'
// type jobs, for now, just not allowing more than
// 10 jobs to run at once.
var availableProcesses =
Math.Max( 0, 10 - runningJobs.Count );
if ( availableProcesses == 0 ) return;
var availableJobs =
JobProvider.TakeNextAvailableJobs( availableProcesses );
foreach ( var j in availableJobs )
{
// If any jobs processed, poll immediately when finished
timerDue = 0;
var job = new RunningJob
{
Key = j.jKey,
InputPackage = j.JobData.jdInputPackage,
DateStart = j.jDateStart.Value,
CancellationSource = new CancellationTokenSource()
};
// Add running job to the workload monitor
runningJobs.AddOrUpdate(
j.jKey,
job,
( key, info ) => null );
Task.Factory
.StartNew(
i =>
{
var t = (Tuple<Guid, CancellationToken>)i;
var key = t.Item1; // Job Key
// Running process would check if cancel has been requested
var token = t.Item2;
var totalProfilesProcess = 1;
try
{
eventLog.WriteEntry( $"Running job {key}" );
// All code in here completes the jobs.
// Will be a seperate method per JobType.
// Any exceptions in here *MUST NOT*
// take down service. Before allowing
// the exception to propogate back up
// into *this* try/catch, the code must
// successfully clean up any resources
// and state that was being modified so
// that the client who submitted this
// job is properly notified.
// This is just simulation of running a
// job...so each job takes 10 seconds.
for ( var d = 0; d < 10; d++ )
{
// or could do await if I called Unwrap(),
// https://blogs.msdn.microsoft.com/pfxteam/2011/10/24/task-run-vs-task-factory-startnew/
Task.Delay( 1000 ).Wait();
totalProfilesProcess++;
if ( token.IsCancellationRequested )
{
// TODO: Clean up the job
throw new OperationCanceledException( token );
}
}
// Success
JobProvider.UpdateJobStatus( key, 2, totalProfilesProcess );
}
catch ( OperationCanceledException )
{
// Cancelled
JobProvider.UpdateJobStatus( key, 3, totalProfilesProcess );
throw;
}
catch ( Exception )
{
// Failed
JobProvider.UpdateJobStatus( key, 4, totalProfilesProcess );
throw;
}
},
// Pass cancellation token to job so it can watch cancel request
Tuple.Create( j.jKey, job.CancellationSource.Token ),
// associate cancellation token with Task started via StartNew()
job.CancellationSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default
).ContinueWith(
( t, k ) =>
{
// When Job is finished, log exception if present.
// Haven't tested this yet, but think
// Exception will always be AggregateException
// so I'll have to examine the InnerException.
if ( !t.IsCanceled && t.IsFaulted )
{
eventLog.WriteEntry( $"Exception for {k}: {t.Exception.Message}", EventLogEntryType.Error );
}
eventLog.WriteEntry( $"Completed job {k}" );
// Remove running job from the workload monitor
RunningJob completedJob;
runningJobs.TryRemove(
(Guid)k, out completedJob );
},
j.jKey
);
}
}
catch ( Exception ex )
{
// If can't even launch job, disable the polling.
// TODO: Could have an error threshhold where I don't
// shut down until X errors happens
eventLog.WriteEntry(
ex.Message + "\r\n\r\n" + ex.StackTrace,
EventLogEntryType.Error );
timer = null;
}
finally
{
// If timer wasn't 'terminated' in OnPause or OnStop,
// then set to call timer again
if ( timer != null )
{
timer.Change( timerDue, Timeout.Infinite );
}
}
}
}
class RunningJob
{
public Guid Key { get; set; }
public DateTime DateStart { get; set; }
public XElement InputPackage { get; set; }
public CancellationTokenSource CancellationSource { get; set; }
}
我选择 Hangfire.io 作为我的解决方案。
我正在用 C# 构建一个 windows 服务,它将监视一个作业队列,当它在队列中找到可用的项目时,它会启动将 'completely' 处理该作业的作业(包括失败)。我正在使用 Task.Factory.StartNew() 并且诚然,我在 TAP 方面非常新手(完成此 post 后前往阅读博客)。
要求
定期轮询数据库队列以查找可用的作业。 (为了这个问题的目的,让我们忽略消息队列与数据库队列的争论)
异步启动作业,以便轮询可以继续监视队列并启动新作业。
遵守 'job' 门槛,这样就不会产生太多的工作。
'Delay' 正在处理作业时关闭服务。
确保作业中的失败记录到事件日志中并且不会使 Windows 服务崩溃。
我的代码在下面,但这是我的 questions/concerns(如果有更好的地方 post 这个,请告诉我)但这主要围绕正确使用 TAP 和 'stability' 的。请注意,我的大部分问题也都记录在代码中。
问题
在 PollJobs 中,我使用 Task.Factory.Start/ContinueWith 的方式是否适合使用此作业处理服务来保持高吞吐量?我永远不会阻塞任何线程,并希望为我目前拥有的一小部分 TAP 使用正确的模式。
ConcurrentDictionary - 使用它来监视当前 运行 个作业,并且每个作业在完成时将其自身从字典中删除(在我假设来自 Task.Factory.StartNew 的单独线程上) ,因为它是 ConcurrentDictionary,我假设我在使用它时不需要任何锁?
作业异常(最严重的异常是 OutOfMemoryException)- 作业处理期间的任何异常 不能 关闭服务并且必须正确记录在事件日志和数据库队列中.不幸的是,目前有些作业可能会抛出 OutOfMemoryException。 'job processing' 中的 try/catch 是否足以捕获和处理所有情况,从而使 Windows 服务永远不会意外终止?还是 better/safer 为每个作业启动一个 AppDomain 以实现更多隔离? (超杀?)
我看到过关于 'proper' 定时器使用的争论,但没有像样的答案。对我的 System.Threading.Timer 的设置和使用有什么意见吗? (特别是围绕我如何确保在上一次调用完成之前不再调用 PollJobs)
如果您已经做到了这一点。提前致谢。
代码
public partial class EvolutionService : ServiceBase
{
EventLog eventLog = new EventLog() {
Source = "BTREvolution",
Log = "Application"
};
Timer timer;
int pollingSeconds = 1;
// Dictionary of currently running jobs so I can query current workload.
// Since ConcurrentDictionary, hoping I don't need any lock() { } code.
private ConcurrentDictionary<Guid, RunningJob> runningJobs =
new ConcurrentDictionary<Guid, RunningJob>();
public EvolutionService( string[] args )
{
InitializeComponent();
if ( !EventLog.SourceExists( eventLog.Source ) )
{
EventLog.CreateEventSource(
eventLog.Source,
eventLog.Log );
}
}
protected override void OnStart( string[] args )
{
// Only run polling code one time and the PollJobs will
// initiate next poll interval so that PollJobs is never
// called again before it finishes its processing,
timer = new System.Threading.Timer(
PollJobs, null,
TimeSpan.FromSeconds( 5 ).Milliseconds,
Timeout.Infinite );
}
protected override void OnPause()
{
// Disable the timer polling so no more jobs are processed
timer = null;
// Don't allow pause if any jobs are running
if ( runningJobs.Count > 0 )
{
var searcher = new System.Management.ManagementObjectSearcher(
"SELECT UserName FROM Win32_ComputerSystem" );
var collection = searcher.Get();
var username =
(string)collection
.Cast<System.Management.ManagementBaseObject>()
.First()[ "UserName" ];
throw new InvalidOperationException( $"{username} requested pause. The service will not process incoming jobs, but it must finish the {runningJobs.Count} job(s) are running before it can be paused." );
}
base.OnPause();
}
protected override void OnContinue()
{
// Tell time to start polling one time in 5 seconds
timer = new System.Threading.Timer(
PollJobs, null,
TimeSpan.FromSeconds( 5 ).Milliseconds,
Timeout.Infinite );
base.OnContinue();
}
protected override void OnStop()
{
// Disable the timer polling so no more jobs are processed
timer = null;
// Until all jobs successfully cancel, keep requesting more time
//
var task = Task.Run( () =>
{
// If any running jobs, send the Cancel notification
if ( runningJobs.Count > 0 )
{
foreach ( var job in runningJobs )
{
eventLog.WriteEntry(
$"Cancelling job {job.Value.Key}" );
job.Value.CancellationSource.Cancel();
}
}
// When a job cancels (and thus completes) it'll
// be removed from the runningJobs workload monitor.
// While any jobs are running, just wait another second
while ( runningJobs.Count > 0 )
{
Task.Delay( TimeSpan.FromSeconds( 1 ) ).Wait();
}
} );
// While the task is not finished, every 5 seconds
// I'll request an additional 5 seconds
while ( !task.Wait( TimeSpan.FromSeconds( 5 ) ) )
{
RequestAdditionalTime(
TimeSpan.FromSeconds( 5 ).Milliseconds );
}
}
public void PollJobs( object state )
{
// If no jobs processed, then poll at regular interval
var timerDue =
TimeSpan.FromSeconds( pollingSeconds ).Milliseconds;
try
{
// Could define some sort of threshhold here so it
// doesn't get too bogged down, might have to check
// Jobs by type to know whether 'batch' or 'single'
// type jobs, for now, just not allowing more than
// 10 jobs to run at once.
var availableProcesses =
Math.Max( 0, 10 - runningJobs.Count );
if ( availableProcesses == 0 ) return;
var availableJobs =
JobProvider.TakeNextAvailableJobs( availableProcesses );
foreach ( var j in availableJobs )
{
// If any jobs processed, poll immediately when finished
timerDue = 0;
var job = new RunningJob
{
Key = j.jKey,
InputPackage = j.JobData.jdInputPackage,
DateStart = j.jDateStart.Value,
CancellationSource = new CancellationTokenSource()
};
// Add running job to the workload monitor
runningJobs.AddOrUpdate(
j.jKey,
job,
( key, info ) => null );
Task.Factory
.StartNew(
i =>
{
var t = (Tuple<Guid, CancellationToken>)i;
var key = t.Item1; // Job Key
// Running process would check if cancel has been requested
var token = t.Item2;
var totalProfilesProcess = 1;
try
{
eventLog.WriteEntry( $"Running job {key}" );
// All code in here completes the jobs.
// Will be a seperate method per JobType.
// Any exceptions in here *MUST NOT*
// take down service. Before allowing
// the exception to propogate back up
// into *this* try/catch, the code must
// successfully clean up any resources
// and state that was being modified so
// that the client who submitted this
// job is properly notified.
// This is just simulation of running a
// job...so each job takes 10 seconds.
for ( var d = 0; d < 10; d++ )
{
// or could do await if I called Unwrap(),
// https://blogs.msdn.microsoft.com/pfxteam/2011/10/24/task-run-vs-task-factory-startnew/
Task.Delay( 1000 ).Wait();
totalProfilesProcess++;
if ( token.IsCancellationRequested )
{
// TODO: Clean up the job
throw new OperationCanceledException( token );
}
}
// Success
JobProvider.UpdateJobStatus( key, 2, totalProfilesProcess );
}
catch ( OperationCanceledException )
{
// Cancelled
JobProvider.UpdateJobStatus( key, 3, totalProfilesProcess );
throw;
}
catch ( Exception )
{
// Failed
JobProvider.UpdateJobStatus( key, 4, totalProfilesProcess );
throw;
}
},
// Pass cancellation token to job so it can watch cancel request
Tuple.Create( j.jKey, job.CancellationSource.Token ),
// associate cancellation token with Task started via StartNew()
job.CancellationSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default
).ContinueWith(
( t, k ) =>
{
// When Job is finished, log exception if present.
// Haven't tested this yet, but think
// Exception will always be AggregateException
// so I'll have to examine the InnerException.
if ( !t.IsCanceled && t.IsFaulted )
{
eventLog.WriteEntry( $"Exception for {k}: {t.Exception.Message}", EventLogEntryType.Error );
}
eventLog.WriteEntry( $"Completed job {k}" );
// Remove running job from the workload monitor
RunningJob completedJob;
runningJobs.TryRemove(
(Guid)k, out completedJob );
},
j.jKey
);
}
}
catch ( Exception ex )
{
// If can't even launch job, disable the polling.
// TODO: Could have an error threshhold where I don't
// shut down until X errors happens
eventLog.WriteEntry(
ex.Message + "\r\n\r\n" + ex.StackTrace,
EventLogEntryType.Error );
timer = null;
}
finally
{
// If timer wasn't 'terminated' in OnPause or OnStop,
// then set to call timer again
if ( timer != null )
{
timer.Change( timerDue, Timeout.Infinite );
}
}
}
}
class RunningJob
{
public Guid Key { get; set; }
public DateTime DateStart { get; set; }
public XElement InputPackage { get; set; }
public CancellationTokenSource CancellationSource { get; set; }
}
我选择 Hangfire.io 作为我的解决方案。