如何在基于事件的设计和 terminate/cleanup 完成的作业中使用 Azure Batch
How to use Azure Batch in an event based design and terminate/cleanup finished jobs
使用 Azure Batch,我的项目使用基于事件的设计和函数和队列将作业添加到池中。当作业完成时,它仍然是 "active",即使所有任务都已完成。
一个(单独使用应用程序服务计划)功能在定时器上触发,该定时器从队列中读取 X 数量的消息。函数:
- 创建一个池(如果它不存在)
- 创建工作
- 将任务添加到该作业
效果很好。但是,一旦任务完成,作业状态将保持活动状态,即使所有任务都已完成。我希望职位 terminate/cleanup/set 状态 "completed"。
而且我希望我的函数是短暂的,不希望有任何状态。所以我没有使用 foreach (CloudTask task in job.CompletedTasks())
来等待任务的状态。
另一种方法是使用任务依赖性,这需要 batchClient.Utilities.CreateTaskStateMonitor()
,因此是一种全状态方法。
在基于事件的设计中使用 Azure Batch 的最佳方式是什么?具体来说,任务完成后如何 terminate/cleanup 作业?
一旦任务下的所有任务都完成,您就可以获得任务 "auto complete"。 CloudJob 对象上有一个名为 OnAllTasksComplete 的 属性。
在向作业中添加任务时,您需要最初将此 property 设置为 NoAction
(默认值)。将所有任务添加到作业后,您可以将该值更新为 TerminateJob
,然后调用 Commit()/CommitAsync()
。请注意,如果您保留最初提交的 CloudJob,则需要先 Refresh()/RefreshAsync()
才能修改属性并提交。或者,您可以 GetJob()/GetJobAsync()
,修改,然后提交。
对于 event-based 设计,您可以查看启用 Batch service analytics 并查看它是否适合您的场景。
fpark 回答后的代码最终解决方案:
public class Orchestrator()
{
public Task ExecuteAsync()
{
// Create the Batch pool, which contains the compute nodes
// that execute the tasks.
var pool = await _batchManager.CreatePoolIfNotExistsAsync();
// Create the job that runs the tasks.
var job = await _batchManager.CreateJobIfNotExistsAsync(_domain, pool.Id);
// Obtain the bound job from the Batch service
await job.RefreshAsync();
// Create a collection of tasks and add them to the Batch job.
var tasks = await _fileProcessingTasksFactory.CreateAsync(job.Id);
// Add the tasks to the job; the tasks are automatically scheduled
// for execution on the nodes by the Batch service.
await job.AddTaskAsync(tasks);
job.OnAllTasksComplete = OnAllTasksComplete.TerminateJob;
await job.CommitAsync();
}
}
public class BatchManager()
public async Task<CloudPool> CreatePoolIfNotExistsAsync()
{
// Code to create and return a pool.
}
public async Task<CloudJob> CreateJobIfNotExistsAsync(string domain, string poolId)
{
// Job id cannot contain : so replace them.
var jobId = $"{domain}-{DateTime.UtcNow:s}".Replace(":", "-");
var job = _parameters.BatchClient.JobOperations.CreateJob();
job.Id = jobId;
job.PoolInformation = new PoolInformation { PoolId = poolId };
await job.CommitAsync();
return job;
}
}
如果您尝试直接使用 OnAllTasksComplete.TerminateJob
创建作业,您将收到以下错误:
Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
2018-03-27 07:57:40.738 +02:00 [Error] "636577269909538505" - Failure while scheduling Azure Batch tasks.
System.InvalidOperationException: This object is in an invalid state. Write access is not allowed.
at Microsoft.Azure.Batch.PropertyAccessor`1.ThrowIfReadOnly(Boolean overrideReadOnly)
at Microsoft.Azure.Batch.PropertyAccessor`1.<>c__DisplayClass19_0.<SetValue>b__0()
at Microsoft.Azure.Batch.PropertyAccessController.WriteProperty(Action propertyWriteAction, BindingAccess allowedAccess, String propertyName)
at Microsoft.Azure.Batch.PropertyAccessor`1.SetValue(T value, Boolean overrideReadOnly, Boolean overrideAccessControl)
at Microsoft.Azure.Batch.CloudJob.set_OnAllTasksComplete(Nullable`1 value)
at BatchManager.CreateJobIfNotExist(String domain, String poolId) in C:\ProjectsGitHub\ProjectName\BatchManager.cs:line 107
at FileProcessingOrchestrator.<ExecuteAsync>d__6.MoveNext() in C:\ProjectsGitHub\ProjectName\FileProcessingOrchestrator.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Nnip.Qrs.EdgarDataProcessing.Parallelization.FunctionApp.ScheduleAzureBatchTasks.<Run>d__0.MoveNext() in C:\ProjectsGitHub\ProjectName\FunctionApp\ScheduleAzureBatchTasks.cs:line 93
Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
A ScriptHost error has occurred
Exception while executing function: ScheduleAzureBatchTasks. Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
Exception while executing function: ScheduleAzureBatchTasks
Exception while executing function: ScheduleAzureBatchTasks. Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
Function completed (Failure, Id=6173b9d2-5058-4a6d-9406-1cf00340774e, Duration=71076ms)
Executed 'ScheduleAzureBatchTasks' (Failed, Id=6173b9d2-5058-4a6d-9406-1cf00340774e)
System.Private.CoreLib: Exception while executing function: ScheduleAzureBatchTasks. Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
Function had errors. See Azure WebJobs SDK dashboard for details. Instance ID is '6173b9d2-5058-4a6d-9406-1cf00340774e'
System.Private.CoreLib: Exception while executing function: ScheduleAzureBatchTasks. Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
所以在添加完所有任务后设置job.OnAllTasksComplete
。
在所有任务完成后,作业大约需要两分钟(在我的例子中)将其状态设置为 Completed
。
使用 Azure Batch,我的项目使用基于事件的设计和函数和队列将作业添加到池中。当作业完成时,它仍然是 "active",即使所有任务都已完成。
一个(单独使用应用程序服务计划)功能在定时器上触发,该定时器从队列中读取 X 数量的消息。函数:
- 创建一个池(如果它不存在)
- 创建工作
- 将任务添加到该作业
效果很好。但是,一旦任务完成,作业状态将保持活动状态,即使所有任务都已完成。我希望职位 terminate/cleanup/set 状态 "completed"。
而且我希望我的函数是短暂的,不希望有任何状态。所以我没有使用 foreach (CloudTask task in job.CompletedTasks())
来等待任务的状态。
另一种方法是使用任务依赖性,这需要 batchClient.Utilities.CreateTaskStateMonitor()
,因此是一种全状态方法。
在基于事件的设计中使用 Azure Batch 的最佳方式是什么?具体来说,任务完成后如何 terminate/cleanup 作业?
一旦任务下的所有任务都完成,您就可以获得任务 "auto complete"。 CloudJob 对象上有一个名为 OnAllTasksComplete 的 属性。
在向作业中添加任务时,您需要最初将此 property 设置为 NoAction
(默认值)。将所有任务添加到作业后,您可以将该值更新为 TerminateJob
,然后调用 Commit()/CommitAsync()
。请注意,如果您保留最初提交的 CloudJob,则需要先 Refresh()/RefreshAsync()
才能修改属性并提交。或者,您可以 GetJob()/GetJobAsync()
,修改,然后提交。
对于 event-based 设计,您可以查看启用 Batch service analytics 并查看它是否适合您的场景。
fpark 回答后的代码最终解决方案:
public class Orchestrator()
{
public Task ExecuteAsync()
{
// Create the Batch pool, which contains the compute nodes
// that execute the tasks.
var pool = await _batchManager.CreatePoolIfNotExistsAsync();
// Create the job that runs the tasks.
var job = await _batchManager.CreateJobIfNotExistsAsync(_domain, pool.Id);
// Obtain the bound job from the Batch service
await job.RefreshAsync();
// Create a collection of tasks and add them to the Batch job.
var tasks = await _fileProcessingTasksFactory.CreateAsync(job.Id);
// Add the tasks to the job; the tasks are automatically scheduled
// for execution on the nodes by the Batch service.
await job.AddTaskAsync(tasks);
job.OnAllTasksComplete = OnAllTasksComplete.TerminateJob;
await job.CommitAsync();
}
}
public class BatchManager()
public async Task<CloudPool> CreatePoolIfNotExistsAsync()
{
// Code to create and return a pool.
}
public async Task<CloudJob> CreateJobIfNotExistsAsync(string domain, string poolId)
{
// Job id cannot contain : so replace them.
var jobId = $"{domain}-{DateTime.UtcNow:s}".Replace(":", "-");
var job = _parameters.BatchClient.JobOperations.CreateJob();
job.Id = jobId;
job.PoolInformation = new PoolInformation { PoolId = poolId };
await job.CommitAsync();
return job;
}
}
如果您尝试直接使用 OnAllTasksComplete.TerminateJob
创建作业,您将收到以下错误:
Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
2018-03-27 07:57:40.738 +02:00 [Error] "636577269909538505" - Failure while scheduling Azure Batch tasks.
System.InvalidOperationException: This object is in an invalid state. Write access is not allowed.
at Microsoft.Azure.Batch.PropertyAccessor`1.ThrowIfReadOnly(Boolean overrideReadOnly)
at Microsoft.Azure.Batch.PropertyAccessor`1.<>c__DisplayClass19_0.<SetValue>b__0()
at Microsoft.Azure.Batch.PropertyAccessController.WriteProperty(Action propertyWriteAction, BindingAccess allowedAccess, String propertyName)
at Microsoft.Azure.Batch.PropertyAccessor`1.SetValue(T value, Boolean overrideReadOnly, Boolean overrideAccessControl)
at Microsoft.Azure.Batch.CloudJob.set_OnAllTasksComplete(Nullable`1 value)
at BatchManager.CreateJobIfNotExist(String domain, String poolId) in C:\ProjectsGitHub\ProjectName\BatchManager.cs:line 107
at FileProcessingOrchestrator.<ExecuteAsync>d__6.MoveNext() in C:\ProjectsGitHub\ProjectName\FileProcessingOrchestrator.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Nnip.Qrs.EdgarDataProcessing.Parallelization.FunctionApp.ScheduleAzureBatchTasks.<Run>d__0.MoveNext() in C:\ProjectsGitHub\ProjectName\FunctionApp\ScheduleAzureBatchTasks.cs:line 93
Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
A ScriptHost error has occurred
Exception while executing function: ScheduleAzureBatchTasks. Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
Exception while executing function: ScheduleAzureBatchTasks
Exception while executing function: ScheduleAzureBatchTasks. Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
Function completed (Failure, Id=6173b9d2-5058-4a6d-9406-1cf00340774e, Duration=71076ms)
Executed 'ScheduleAzureBatchTasks' (Failed, Id=6173b9d2-5058-4a6d-9406-1cf00340774e)
System.Private.CoreLib: Exception while executing function: ScheduleAzureBatchTasks. Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
Function had errors. See Azure WebJobs SDK dashboard for details. Instance ID is '6173b9d2-5058-4a6d-9406-1cf00340774e'
System.Private.CoreLib: Exception while executing function: ScheduleAzureBatchTasks. Microsoft.Azure.Batch: This object is in an invalid state. Write access is not allowed.
所以在添加完所有任务后设置job.OnAllTasksComplete
。
在所有任务完成后,作业大约需要两分钟(在我的例子中)将其状态设置为 Completed
。