Durable Orchestrator 在 Task.WhenAll 后卡住

Durable Orchestrator gets stuck after Task.WhenAll

我有一个包含 4 个活动的协调器:

  1. PromoDataFromActpm - 下载一些数据的单个 activity 来自 API.
  2. PromoDataExport - 单个 activity 从 activity #1 发送数据 到 Azure 存储
  3. SavePromoProductFromACPSActivity - 并行活动,对于 activity #1 中的每个项目调用一些 API 并下载 一些数据
  4. TableToBlobPromoProductActivity - 写入的并行活动 从 activity #3 到 blob 存储
  5. 的项目

对于活动#3,集合中的每个项目都是 1 activity 调用,对于 activity#4,它在每 1 activity 调用中分批收集 50 个项目,正在Task.WhenAll.

正在等待

在本地环境中一切正常,但在 azure 上,由于某种原因,orchestrator 在 activity #3 Task.WhenAll 后停止处理。 我在日志中收到许多 SavePromoProductFromACPSActivity 的请求,这是应该的,但一段时间后它们停止并且 TableToBlobPromoProductActivity activity 从未被调用.我偶尔会收到“正在执行 XYZ 协调器”,几分钟后会收到“已执行 XYZ 协调器”,这些消息之间没有 activity 调用。

我已经和它抗争了一段时间,但没有成功。有什么想法吗?

代码如下:

            var orchestrationId = context.InstanceId.Replace(":","");

            var promoData = await context.CallActivityAsync<PromotionExportModel[]>(FunctionNamesExport.Activity.PromoDataFromActpm, null);
            var exportResult = await context.CallActivityAsync<OperationResponse>(FunctionNamesExport.Activity.PromoDataExport, promoData);

            var acpsTasks = new List<Task<List<PromotedProductExportModel>>>();
            var acpsPromos = new List<PromotedProductExportModel>();
            foreach (var promo in promoData)
            {
                acpsTasks.Add(context.CallActivityAsync<List<PromotedProductExportModel>>(FunctionNamesExport.Activity.SavePromoProductFromACPSActivity, promo));
            }
            await Task.WhenAll(acpsTasks);
            acpsTasks.ForEach(x => acpsPromos.AddRange(x.Result));

            var promoDataBatched = acpsPromos.Batch(50);
            var tasks = new List<Task>();
            foreach(var arr in promoDataBatched)
            {
                var promoBlob = new PromotionExportSubModel
                {
                    PromotionExportModel = arr.ToArray(),
                    blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray()),
                    orchestrationId = orchestrationId
                };
tasks.Add(context.CallActivityAsync(FunctionNamesExport.Activity.TableToBlobPromoProductActivity, promoBlob));
            }
            await Task.WhenAll(tasks);

由于您在评论中提到您将在第 3 步中并行启动 3000 个活动,所以我知道会发生什么。 请记住,每个 activity 都会将行添加到历史记录 table 中,所有这些行都需要在 orchestrator 的每次重播中加载(发生在每个 activity returns 之后)。 所以加载时间会不断增加,内存使用量也会增加。

我为此使用的典型解决方案是将工作拆分为 sub-orchestrators。 例如,将数据分成 100 个批次,为每个批次启动一个 sub-orchestrator 到 运行 并行。 然后在 sub-orchestrator 中执行实际的第 3 步活动。 这样,sub-orchestrator 实例的历史记录 table 行的上限为 100 个活动所需的行,而主编排器仅获得 ~30 个结果。 您可以对第 4 步做类似的事情。