运行 Azure Batch中的Azure Data Factory Activities时,应该如何处理异步

When running Azure Data Factory Activities in Azure Batch, how should asynchronicity be handled

背景

我稍微简化了这个场景,但这是普遍问题。

我正在使用 Azure 数据工厂将自定义 API 中的数据提取到 Azure 数据仓库中的 table 中。我正在使用 IDotNetActivity 来 运行 调用 API 的 C# 代码并将数据加载到数据仓库中。 Azure Batch 中的 activity 运行。

在 activity 本身中,在我调用自定义 API 之前,我从 Azure Blob 存储中的文件加载人员列表。 然后,我为文件中的每个人调用自定义 API。 这些调用是依次进行的。 问题是这种方法花费的时间太长。 文件大小可能会增加,因此花费的时间只会变得更糟。

我尝试提高性能的事情

主要问题

Azure Batch 是否支持异步/等待?

更多问题

据我了解,personIds.Batch(100) 只是将 personIds 分批放入大小为 (100) 的桶中。

//method1
foreach (var personIdsBatch in personIds.Batch(100))
{
    customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
}

//method2
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIds));

上述两种方法都会按顺序为每个人调用您的自定义 API,而 method1 添加了处理相同任务的额外逻辑。

Does Azure Batch support async / await?

根据你的代码,我定义了IDotNetActivity实现如下,你可以参考一下:

public class MyDotNetActivity : IDotNetActivity
{
    public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
    {
        return ExecuteAsync(linkedServices, datasets, activity, logger).Result;
    }

    async Task<IDictionary<string, string>> ExecuteAsync(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
    {
        List<int> personIds = await GetPersonIds("{clientAddress}", "{clientUsername}", "{clientPassword}");
        var tasks = new List<Task<List<CustomApiResult>>>();
        foreach (var personIdsBatch in personIds.Batch(100))
        {
            tasks.AddRange(GetCustomResultsByBatch("{address}", "{username}", "{password}", "{personIdsBatch}"));
        }

        var taskResults = await Task.WhenAll(tasks);
        List<CustomApiResult> customResults = taskResults.SelectMany(r=>r).ToList();

        //process the custom api results

        return new Dictionary<string, string>();
    }

    async Task<List<CustomApiResult>> GetCustomResultsByBatch(string address, string username, string password, IEnumerable<int> personIdsBatch)
    {
        //Get Custom Results By Batch
        return new List<CustomApiResult>();
    }

    async Task<List<int>> GetPersonIds(string clientAddress, string clientUsername, string clientPassword)
    {
        //load a list of people from a file in Azure Blob storage
        return new List<int>(); 
    }
}

此外,我假设您可以利用 Parallel.ForEach 来并行执行您的同步作业:

List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
var customResults = new List<CustomApiResult>();
Parallel.ForEach(personIds.Batch(100), 
new ParallelOptions()
{
    MaxDegreeOfParallelism=5
},
(personIdsBatch) =>
{
    var results = GetCustomResultsByBatch(address, username, password, personIdsBatch);
    lock (customResults)
    {
        customResults.AddRange(results);
    }
});