如何在 Orchestrator 中对大数据进行多线程处理?

How to multi-thread big data in the Orchestrator?

我在 Orchestrator 中有以下代码:

        var parallelTasks = new List<Task>();

        // Get Records
        List<Record> records = await context.CallActivityAsync<List<Record>>("GetRecords", orchestrationContext);

        // Write Records
        foreach (Record record in records)
        {
            parallelTasks.Add(context.CallActivityAsync<int>("WriteRecord", record));
        }

        await Task.WhenAll(parallelTasks);

这失败了,因为 GetRecords return 的数据太多(60000 条记录)并且 Orchestrator 不会继续,因为 CallActivityAsync 不能 return 超过 8mb 的数据。

这也可能会失败,因为它实际上会尝试为每次写入启动 60000 个活动。

我这样做是为了让 Azure 使用多个线程写入 ADL。起初我尝试使用 Semaphores,多个在线来源告诉我不应该使用 Sempahores,而是 "CallActivityAsync",这将允许 Azure 管理它自己的线程。

如何解决这个问题并实现多线程写入ADL?

郑重声明,我使用的库一次只能写入一个文件(我知道 MS 的新库包含批量写入功能,但由于各种原因我无法使用它)。

是否有理由让 GetRecordsWriteRecord 处于持久功能设置中?如果不是,GetRecords 可以将每个 Record 对象(序列化为 JSON)放到 Azure Queue/EventHub,而不是返回一个巨大的列表。然后可以从 Queue/EventHub 触发 WriteRecords 来处理每条消息。