Azure 持久函数 - CallActivityAsync 时出现 InvalidOperationException

Azure Durable function - InvalidOperationException when CallActivityAsync

我正在玩 Azure Durable functions。目前我在调用 activity 后在 Orchestration 函数中得到 InvalidOperationException。它抱怨检测到 多线程执行。如果 orchestrator 函数先前从不受支持的异步回调 恢复,则可能会发生这种情况。

有人遇到过这样的问题吗?我做错了什么? 完整的代码可以在 GitHub

上找到

这是编排函数的行:

var res = await ctx.CallActivityAsync<int>("LengthCheck", "inputData");

LengthCheck 活动函数是:

[FunctionName("LengthCheck")]
public static Task<int> Calc([ActivityTrigger] string input)
{
    var task = Task.Delay(TimeSpan.FromSeconds(5));
    task.Wait();
    return Task.FromResult(input.Length);
}

堆栈跟踪是:

ac6fd5cdd07a4dc9b2577657d65c4f27: Function 'InpaintOrchestration (Orchestrator)', version '' failed with an error. Reason: System.InvalidOperationException: Multithreaded execution was detected. This can happen if the orchestrator function previously resumed from an unsupported async callback.

at Microsoft.Azure.WebJobs.DurableOrchestrationContext.ThrowIfInvalidAccess()

at Microsoft.Azure.WebJobs.DurableOrchestrationContext.d__47`1.MoveNext()

End of stack trace from previous location where exception was thrown

at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)

at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)

at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()

只要协调器函数以不受支持的方式执行异步工作,就会发生此异常。在这种情况下,“不受支持”实际上意味着 await 被用于 non-durable 任务(而“non-durable”意味着它是来自其他 API 以外的任务IDurableOrchestrationContext).

您可以在此处找到有关协调程序函数的代码约束的更多信息:https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-code-constraints

以下是我快速扫描您的代码时违反的规则:

  • Orchestrator code should be non-blocking. For example, that means no I/O and no calls to Thread.Sleep or equivalent APIs. If an orchestrator needs to delay, it can use the CreateTimer API.

  • Orchestrator code must never initiate any async operation except by using the IDurableOrchestrationContext API. For example, no Task.Run, Task.Delay or HttpClient.SendAsync. The Durable Task Framework executes orchestrator code on a single thread and cannot interact with any other threads that could be scheduled by other async APIs.

当我们检测到进行了不支持的异步调用时,就会出现此异常。我注意到这段代码中发生了这种情况:

    private static async Task SaveImageLabToBlob(ZsImage imageLab, CloudBlobContainer container, string fileName)
    {
        var argbImage = imageLab
            .Clone()
            .FromLabToRgb()
            .FromRgbToArgb(Area2D.Create(0, 0, imageLab.Width, imageLab.Height));

        using (var bitmap = argbImage.FromArgbToBitmap())
        using (var outputStream = new MemoryStream())
        {
            // modify image
            bitmap.Save(outputStream, ImageFormat.Png);

            // save the result back
            outputStream.Position = 0;
            var resultImageBlob = container.GetBlockBlobReference(fileName);
            await resultImageBlob.UploadFromStreamAsync(outputStream);
        }
    }

进行异步或阻塞调用的正确方法是将它们包装在 activity 函数中,这些函数没有任何这些约束。

在此扩展的最新版本(v1.3.2 及更高版本)中,我们在异常消息中描述 code-constraints 的文档中包含了 link。

我的持久协调器功能也发生了这种情况。我必须摆脱 activity 函数调用的所有 .ConfigureAwait(false) 结尾。

 //invoking First activity function
        var id = await context.CallActivityAsync<Guid>(Function1, requestModel);
        

        //invoking second activity function that uses data from the first activity function without ConfigureAwait(false)
        var readModel = await context.CallActivityAsync<ReadModel>(Function2, id);
       

        //invoking third activity function that uses data from the second activity function without ConfigureAwait(false)
        await context.CallActivityAsync(Function3, cartReadModel);