Azure 以太编排 - 取消计时器
Azure ethernal orchestration - Cancel timer
我正在使用带有任务池的 Azure Ethernal Orchestrations。
我有很多任务要执行,有一些优先级。我有一项服务给我最优先的任务。
但有时我需要等到特定日期才能执行下一个任务。
所以我用永恒的编排制作了一种看门狗。
我的问题是当我需要取消编排时,我也需要取消定时器。
但是当我这样做的时候,函数仍然处于 运行 状态并且我的 OutputStatus 没有改变。
这是我的编排功能:
[FunctionName(nameof(RunTaskWatchdog))]
public async Task RunTaskWatchdog(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var poolName = context.GetInput<string>();
TaskStatus output = new()
{
PoolName = poolName,
Message = "Getting next task"
};
context.SetCustomStatus(output);
// Retrieve the next task to execute
var task = await context.CallActivityAsync<TaskExecution>(nameof(Activities.GetNextResearchForExecution), poolName);
if (task == null)
{
output.Message = "Unable to get a task";
context.SetCustomStatus(output);
return;
}
if (string.IsNullOrEmpty(task.TaskName))
{
output.Message = task.Error;
context.SetCustomStatus(output);
return;
}
output.Message = "Got task";
output.CurrentTask = task;
if (task.NextExecution.HasValue)
{
// If the next task can't be executed before a date, sleep till the good date
output.Message = "Need to wait to respect task delay";
output.NextExcecution = task.NextExecution.Value;
context.SetCustomStatus(output);
try
{
await context.CreateTimer(task.NextExecution.Value, tokens[poolName].Token);
}
catch (OperationCanceledException)
{
output.Message = "Operation cancelled while sleeping";
output.NextExcecution = null;
context.SetCustomStatus(output);
return;
// After this line, the CustomStatus has not changed, and the orchestration is still in running state
}
}
else
{
context.SetCustomStatus(output);
}
var executionSuccess = await context.CallActivityAsync<bool>(nameof(Activities.ExecuteResearchActivity), task.TaskName);
var doneSuccess = await context.CallActivityAsync<bool>(nameof(Activities.SetExecutingTaskDone), poolName);
DateTime nextTask = context.CurrentUtcDateTime.AddSeconds(10);
output.Message = "Waiting next execution";
output.NextExcecution = nextTask;
output.CurrentTask = null;
context.SetCustomStatus(output);
try
{
await context.CreateTimer(nextTask, tokens[poolName].Token);
}
catch (OperationCanceledException)
{
output.Message = "Operation cancelled";
output.NextExcecution = null;
context.SetCustomStatus(output);
return;
}
context.ContinueAsNew(poolName);
}
如您所见,在 catch 部分,我在调试中输入了这个,但是当我使用 Postman (http://localhost:7071/runtime/webhooks/durabletask/instances)
我在这里提供了示例代码:https://github.com/Rizov74/TestDurableOrchestrator
好的,我找到了使用外部事件的方法:
而不是
try
{
await context.CreateTimer(task.NextExecution.Value, tokens[poolName].Token);
}
catch (OperationCanceledException)
{
output.Message = "Operation cancelled while sleeping";
output.NextExcecution = null;
context.SetCustomStatus(output);
return;
// After this line, the CustomStatus has not changed, and the orchestration is still in running state
}
我使用:
using (var cts = new CancellationTokenSource())
{
Task sleepingTask = context.CreateTimer(task.NextExecution.Value, output, cts.Token);
Task timeoutTask = context.WaitForExternalEvent("stop");
Task winner = await Task.WhenAny(sleepingTask, timeoutTask);
if (winner == sleepingTask)
{
// Can continue
}
else
{
// Cancel timer token
cts.Cancel();
if (!context.IsReplaying)
Console.WriteLine($"{context.InstanceId}: wait cancelled {poolName} : {task.TaskName}");
output.Message = "Operation cancelled while sleeping";
output.NextExcecution = null;
context.SetCustomStatus(output);
// If the watchdog has been cancelled, we need to telle the pool the task is cancelled
var ok = await context.CallActivityAsync<bool>(nameof(Activities.CancelExecutingTask), poolName);
if (!context.IsReplaying)
Console.WriteLine($"{context.InstanceId}: task correctly canceled {poolName} : {task.TaskName}");
// Stop the watchdog
return;
}
}
我已经在 github 上更新了我的 repo,如果你想要持久协调器的复杂示例,你可以看看
我正在使用带有任务池的 Azure Ethernal Orchestrations。 我有很多任务要执行,有一些优先级。我有一项服务给我最优先的任务。 但有时我需要等到特定日期才能执行下一个任务。
所以我用永恒的编排制作了一种看门狗。
我的问题是当我需要取消编排时,我也需要取消定时器。 但是当我这样做的时候,函数仍然处于 运行 状态并且我的 OutputStatus 没有改变。
这是我的编排功能:
[FunctionName(nameof(RunTaskWatchdog))]
public async Task RunTaskWatchdog(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var poolName = context.GetInput<string>();
TaskStatus output = new()
{
PoolName = poolName,
Message = "Getting next task"
};
context.SetCustomStatus(output);
// Retrieve the next task to execute
var task = await context.CallActivityAsync<TaskExecution>(nameof(Activities.GetNextResearchForExecution), poolName);
if (task == null)
{
output.Message = "Unable to get a task";
context.SetCustomStatus(output);
return;
}
if (string.IsNullOrEmpty(task.TaskName))
{
output.Message = task.Error;
context.SetCustomStatus(output);
return;
}
output.Message = "Got task";
output.CurrentTask = task;
if (task.NextExecution.HasValue)
{
// If the next task can't be executed before a date, sleep till the good date
output.Message = "Need to wait to respect task delay";
output.NextExcecution = task.NextExecution.Value;
context.SetCustomStatus(output);
try
{
await context.CreateTimer(task.NextExecution.Value, tokens[poolName].Token);
}
catch (OperationCanceledException)
{
output.Message = "Operation cancelled while sleeping";
output.NextExcecution = null;
context.SetCustomStatus(output);
return;
// After this line, the CustomStatus has not changed, and the orchestration is still in running state
}
}
else
{
context.SetCustomStatus(output);
}
var executionSuccess = await context.CallActivityAsync<bool>(nameof(Activities.ExecuteResearchActivity), task.TaskName);
var doneSuccess = await context.CallActivityAsync<bool>(nameof(Activities.SetExecutingTaskDone), poolName);
DateTime nextTask = context.CurrentUtcDateTime.AddSeconds(10);
output.Message = "Waiting next execution";
output.NextExcecution = nextTask;
output.CurrentTask = null;
context.SetCustomStatus(output);
try
{
await context.CreateTimer(nextTask, tokens[poolName].Token);
}
catch (OperationCanceledException)
{
output.Message = "Operation cancelled";
output.NextExcecution = null;
context.SetCustomStatus(output);
return;
}
context.ContinueAsNew(poolName);
}
如您所见,在 catch 部分,我在调试中输入了这个,但是当我使用 Postman (http://localhost:7071/runtime/webhooks/durabletask/instances)
我在这里提供了示例代码:https://github.com/Rizov74/TestDurableOrchestrator
好的,我找到了使用外部事件的方法:
而不是
try
{
await context.CreateTimer(task.NextExecution.Value, tokens[poolName].Token);
}
catch (OperationCanceledException)
{
output.Message = "Operation cancelled while sleeping";
output.NextExcecution = null;
context.SetCustomStatus(output);
return;
// After this line, the CustomStatus has not changed, and the orchestration is still in running state
}
我使用:
using (var cts = new CancellationTokenSource())
{
Task sleepingTask = context.CreateTimer(task.NextExecution.Value, output, cts.Token);
Task timeoutTask = context.WaitForExternalEvent("stop");
Task winner = await Task.WhenAny(sleepingTask, timeoutTask);
if (winner == sleepingTask)
{
// Can continue
}
else
{
// Cancel timer token
cts.Cancel();
if (!context.IsReplaying)
Console.WriteLine($"{context.InstanceId}: wait cancelled {poolName} : {task.TaskName}");
output.Message = "Operation cancelled while sleeping";
output.NextExcecution = null;
context.SetCustomStatus(output);
// If the watchdog has been cancelled, we need to telle the pool the task is cancelled
var ok = await context.CallActivityAsync<bool>(nameof(Activities.CancelExecutingTask), poolName);
if (!context.IsReplaying)
Console.WriteLine($"{context.InstanceId}: task correctly canceled {poolName} : {task.TaskName}");
// Stop the watchdog
return;
}
}
我已经在 github 上更新了我的 repo,如果你想要持久协调器的复杂示例,你可以看看