如何在 Orchestrator 中对大数据进行多线程处理?
How to multi-thread big data in the Orchestrator?
我在 Orchestrator 中有以下代码:
var parallelTasks = new List<Task>();
// Get Records
List<Record> records = await context.CallActivityAsync<List<Record>>("GetRecords", orchestrationContext);
// Write Records
foreach (Record record in records)
{
parallelTasks.Add(context.CallActivityAsync<int>("WriteRecord", record));
}
await Task.WhenAll(parallelTasks);
这失败了,因为 GetRecords return 的数据太多(60000 条记录)并且 Orchestrator 不会继续,因为 CallActivityAsync 不能 return 超过 8mb 的数据。
这也可能会失败,因为它实际上会尝试为每次写入启动 60000 个活动。
我这样做是为了让 Azure 使用多个线程写入 ADL。起初我尝试使用 Semaphores,多个在线来源告诉我不应该使用 Sempahores,而是 "CallActivityAsync",这将允许 Azure 管理它自己的线程。
如何解决这个问题并实现多线程写入ADL?
郑重声明,我使用的库一次只能写入一个文件(我知道 MS 的新库包含批量写入功能,但由于各种原因我无法使用它)。
是否有理由让 GetRecords
和 WriteRecord
处于持久功能设置中?如果不是,GetRecords
可以将每个 Record
对象(序列化为 JSON)放到 Azure Queue/EventHub,而不是返回一个巨大的列表。然后可以从 Queue/EventHub 触发 WriteRecords
来处理每条消息。
我在 Orchestrator 中有以下代码:
var parallelTasks = new List<Task>();
// Get Records
List<Record> records = await context.CallActivityAsync<List<Record>>("GetRecords", orchestrationContext);
// Write Records
foreach (Record record in records)
{
parallelTasks.Add(context.CallActivityAsync<int>("WriteRecord", record));
}
await Task.WhenAll(parallelTasks);
这失败了,因为 GetRecords return 的数据太多(60000 条记录)并且 Orchestrator 不会继续,因为 CallActivityAsync 不能 return 超过 8mb 的数据。
这也可能会失败,因为它实际上会尝试为每次写入启动 60000 个活动。
我这样做是为了让 Azure 使用多个线程写入 ADL。起初我尝试使用 Semaphores,多个在线来源告诉我不应该使用 Sempahores,而是 "CallActivityAsync",这将允许 Azure 管理它自己的线程。
如何解决这个问题并实现多线程写入ADL?
郑重声明,我使用的库一次只能写入一个文件(我知道 MS 的新库包含批量写入功能,但由于各种原因我无法使用它)。
是否有理由让 GetRecords
和 WriteRecord
处于持久功能设置中?如果不是,GetRecords
可以将每个 Record
对象(序列化为 JSON)放到 Azure Queue/EventHub,而不是返回一个巨大的列表。然后可以从 Queue/EventHub 触发 WriteRecords
来处理每条消息。