Azure 以太编排 - 取消计时器

Azure ethernal orchestration - Cancel timer

我正在使用带有任务池的 Azure Ethernal Orchestrations。 我有很多任务要执行,有一些优先级。我有一项服务给我最优先的任务。 但有时我需要等到特定日期才能执行下一个任务。

所以我用永恒的编排制作了一种看门狗。

我的问题是当我需要取消编排时,我也需要取消定时器。 但是当我这样做的时候,函数仍然处于 运行 状态并且我的 OutputStatus 没有改变。

这是我的编排功能:

    [FunctionName(nameof(RunTaskWatchdog))]
    public async Task RunTaskWatchdog(
        [OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        var poolName = context.GetInput<string>();
        TaskStatus output = new()
        {
            PoolName = poolName,
            Message = "Getting next task"
        };
        context.SetCustomStatus(output);

        // Retrieve the next task to execute
        var task = await context.CallActivityAsync<TaskExecution>(nameof(Activities.GetNextResearchForExecution), poolName);
        if (task == null)
        {
            output.Message = "Unable to get a task";
            context.SetCustomStatus(output);
            return;
        }
        if (string.IsNullOrEmpty(task.TaskName))
        {
            output.Message = task.Error;
            context.SetCustomStatus(output);
            return;
        }

        output.Message = "Got task";
        output.CurrentTask = task;
        if (task.NextExecution.HasValue)
        {
            // If the next task can't be executed before a date, sleep till the good date
            output.Message = "Need to wait to respect task delay";
            output.NextExcecution = task.NextExecution.Value;
            context.SetCustomStatus(output);

            try
            {
                await context.CreateTimer(task.NextExecution.Value, tokens[poolName].Token);
            }
            catch (OperationCanceledException)
            {
                output.Message = "Operation cancelled while sleeping";
                output.NextExcecution = null;
                context.SetCustomStatus(output);

                return;

                // After this line, the CustomStatus has not changed, and the orchestration is still in running state
            }
        }
        else
        {
            context.SetCustomStatus(output);
        }

        var executionSuccess = await context.CallActivityAsync<bool>(nameof(Activities.ExecuteResearchActivity), task.TaskName);

        var doneSuccess = await context.CallActivityAsync<bool>(nameof(Activities.SetExecutingTaskDone), poolName);

        DateTime nextTask = context.CurrentUtcDateTime.AddSeconds(10);
        output.Message = "Waiting next execution";
        output.NextExcecution = nextTask;
        output.CurrentTask = null;
        context.SetCustomStatus(output);

        try
        {
            await context.CreateTimer(nextTask, tokens[poolName].Token);
        }
        catch (OperationCanceledException)
        {
            output.Message = "Operation cancelled";
            output.NextExcecution = null;
            context.SetCustomStatus(output);
            return;
        }

        context.ContinueAsNew(poolName);
    }

如您所见,在 catch 部分,我在调试中输入了这个,但是当我使用 Postman (http://localhost:7071/runtime/webhooks/durabletask/instances)

我在这里提供了示例代码:https://github.com/Rizov74/TestDurableOrchestrator

好的,我找到了使用外部事件的方法:

而不是

        try
        {
            await context.CreateTimer(task.NextExecution.Value, tokens[poolName].Token);
        }
        catch (OperationCanceledException)
        {
            output.Message = "Operation cancelled while sleeping";
            output.NextExcecution = null;
            context.SetCustomStatus(output);

            return;

            // After this line, the CustomStatus has not changed, and the orchestration is still in running state
        }

我使用:

            using (var cts = new CancellationTokenSource())
            {
                Task sleepingTask = context.CreateTimer(task.NextExecution.Value, output, cts.Token);
                Task timeoutTask = context.WaitForExternalEvent("stop");

                Task winner = await Task.WhenAny(sleepingTask, timeoutTask);
                if (winner == sleepingTask)
                {
                    // Can continue
                }
                else
                {
                    // Cancel timer token
                    cts.Cancel();
                    if (!context.IsReplaying)
                        Console.WriteLine($"{context.InstanceId}: wait cancelled {poolName} : {task.TaskName}");

                    output.Message = "Operation cancelled while sleeping";
                    output.NextExcecution = null;
                    context.SetCustomStatus(output);

                    // If the watchdog has been cancelled, we need to telle the pool the task is cancelled
                    var ok = await context.CallActivityAsync<bool>(nameof(Activities.CancelExecutingTask), poolName);

                    if (!context.IsReplaying)
                        Console.WriteLine($"{context.InstanceId}: task correctly canceled {poolName} : {task.TaskName}");

                    // Stop the watchdog
                    return;
                }
            }

我已经在 github 上更新了我的 repo,如果你想要持久协调器的复杂示例,你可以看看