Durable Orchestrator 在 Task.WhenAll 后卡住
Durable Orchestrator gets stuck after Task.WhenAll
我有一个包含 4 个活动的协调器:
- PromoDataFromActpm - 下载一些数据的单个 activity
来自 API.
- PromoDataExport - 单个 activity 从 activity #1 发送数据
到 Azure 存储
- SavePromoProductFromACPSActivity - 并行活动,对于
activity #1 中的每个项目调用一些 API 并下载
一些数据
- TableToBlobPromoProductActivity - 写入的并行活动
从 activity #3 到 blob 存储
的项目
对于活动#3,集合中的每个项目都是 1 activity 调用,对于 activity#4,它在每 1 activity 调用中分批收集 50 个项目,正在Task.WhenAll.
正在等待
在本地环境中一切正常,但在 azure 上,由于某种原因,orchestrator 在 activity #3 Task.WhenAll 后停止处理。
我在日志中收到许多 SavePromoProductFromACPSActivity 的请求,这是应该的,但一段时间后它们停止并且 TableToBlobPromoProductActivity activity 从未被调用.我偶尔会收到“正在执行 XYZ 协调器”,几分钟后会收到“已执行 XYZ 协调器”,这些消息之间没有 activity 调用。
我已经和它抗争了一段时间,但没有成功。有什么想法吗?
代码如下:
var orchestrationId = context.InstanceId.Replace(":","");
var promoData = await context.CallActivityAsync<PromotionExportModel[]>(FunctionNamesExport.Activity.PromoDataFromActpm, null);
var exportResult = await context.CallActivityAsync<OperationResponse>(FunctionNamesExport.Activity.PromoDataExport, promoData);
var acpsTasks = new List<Task<List<PromotedProductExportModel>>>();
var acpsPromos = new List<PromotedProductExportModel>();
foreach (var promo in promoData)
{
acpsTasks.Add(context.CallActivityAsync<List<PromotedProductExportModel>>(FunctionNamesExport.Activity.SavePromoProductFromACPSActivity, promo));
}
await Task.WhenAll(acpsTasks);
acpsTasks.ForEach(x => acpsPromos.AddRange(x.Result));
var promoDataBatched = acpsPromos.Batch(50);
var tasks = new List<Task>();
foreach(var arr in promoDataBatched)
{
var promoBlob = new PromotionExportSubModel
{
PromotionExportModel = arr.ToArray(),
blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray()),
orchestrationId = orchestrationId
};
tasks.Add(context.CallActivityAsync(FunctionNamesExport.Activity.TableToBlobPromoProductActivity, promoBlob));
}
await Task.WhenAll(tasks);
由于您在评论中提到您将在第 3 步中并行启动 3000 个活动,所以我知道会发生什么。
请记住,每个 activity 都会将行添加到历史记录 table 中,所有这些行都需要在 orchestrator 的每次重播中加载(发生在每个 activity returns 之后)。
所以加载时间会不断增加,内存使用量也会增加。
我为此使用的典型解决方案是将工作拆分为 sub-orchestrators。
例如,将数据分成 100 个批次,为每个批次启动一个 sub-orchestrator 到 运行 并行。
然后在 sub-orchestrator 中执行实际的第 3 步活动。
这样,sub-orchestrator 实例的历史记录 table 行的上限为 100 个活动所需的行,而主编排器仅获得 ~30 个结果。
您可以对第 4 步做类似的事情。
我有一个包含 4 个活动的协调器:
- PromoDataFromActpm - 下载一些数据的单个 activity 来自 API.
- PromoDataExport - 单个 activity 从 activity #1 发送数据 到 Azure 存储
- SavePromoProductFromACPSActivity - 并行活动,对于 activity #1 中的每个项目调用一些 API 并下载 一些数据
- TableToBlobPromoProductActivity - 写入的并行活动 从 activity #3 到 blob 存储 的项目
对于活动#3,集合中的每个项目都是 1 activity 调用,对于 activity#4,它在每 1 activity 调用中分批收集 50 个项目,正在Task.WhenAll.
正在等待在本地环境中一切正常,但在 azure 上,由于某种原因,orchestrator 在 activity #3 Task.WhenAll 后停止处理。 我在日志中收到许多 SavePromoProductFromACPSActivity 的请求,这是应该的,但一段时间后它们停止并且 TableToBlobPromoProductActivity activity 从未被调用.我偶尔会收到“正在执行 XYZ 协调器”,几分钟后会收到“已执行 XYZ 协调器”,这些消息之间没有 activity 调用。
我已经和它抗争了一段时间,但没有成功。有什么想法吗?
代码如下:
var orchestrationId = context.InstanceId.Replace(":","");
var promoData = await context.CallActivityAsync<PromotionExportModel[]>(FunctionNamesExport.Activity.PromoDataFromActpm, null);
var exportResult = await context.CallActivityAsync<OperationResponse>(FunctionNamesExport.Activity.PromoDataExport, promoData);
var acpsTasks = new List<Task<List<PromotedProductExportModel>>>();
var acpsPromos = new List<PromotedProductExportModel>();
foreach (var promo in promoData)
{
acpsTasks.Add(context.CallActivityAsync<List<PromotedProductExportModel>>(FunctionNamesExport.Activity.SavePromoProductFromACPSActivity, promo));
}
await Task.WhenAll(acpsTasks);
acpsTasks.ForEach(x => acpsPromos.AddRange(x.Result));
var promoDataBatched = acpsPromos.Batch(50);
var tasks = new List<Task>();
foreach(var arr in promoDataBatched)
{
var promoBlob = new PromotionExportSubModel
{
PromotionExportModel = arr.ToArray(),
blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray()),
orchestrationId = orchestrationId
};
tasks.Add(context.CallActivityAsync(FunctionNamesExport.Activity.TableToBlobPromoProductActivity, promoBlob));
}
await Task.WhenAll(tasks);
由于您在评论中提到您将在第 3 步中并行启动 3000 个活动,所以我知道会发生什么。 请记住,每个 activity 都会将行添加到历史记录 table 中,所有这些行都需要在 orchestrator 的每次重播中加载(发生在每个 activity returns 之后)。 所以加载时间会不断增加,内存使用量也会增加。
我为此使用的典型解决方案是将工作拆分为 sub-orchestrators。 例如,将数据分成 100 个批次,为每个批次启动一个 sub-orchestrator 到 运行 并行。 然后在 sub-orchestrator 中执行实际的第 3 步活动。 这样,sub-orchestrator 实例的历史记录 table 行的上限为 100 个活动所需的行,而主编排器仅获得 ~30 个结果。 您可以对第 4 步做类似的事情。