运行 Azure Batch中的Azure Data Factory Activities时,应该如何处理异步
When running Azure Data Factory Activities in Azure Batch, how should asynchronicity be handled
背景
我稍微简化了这个场景,但这是普遍问题。
我正在使用 Azure 数据工厂将自定义 API 中的数据提取到 Azure 数据仓库中的 table 中。我正在使用 IDotNetActivity 来 运行 调用 API 的 C# 代码并将数据加载到数据仓库中。 Azure Batch 中的 activity 运行。
在 activity 本身中,在我调用自定义 API 之前,我从 Azure Blob 存储中的文件加载人员列表。
然后,我为文件中的每个人调用自定义 API。
这些调用是依次进行的。
问题是这种方法花费的时间太长。
文件大小可能会增加,因此花费的时间只会变得更糟。
我尝试提高性能的事情
- 使 API 调用异步并以 3 个为一组调用它们。St运行gely 这个 运行 更慢。看起来批处理不能很好地处理异步/等待。
- 我们看到的其他 st运行geness 是 MoreLinq 的批处理命令根本不起作用。我已经检查了源代码:
https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs。这使用 yield return 但我不知道为什么这不起作用,即使它与 async / await 问题有关。
主要问题
Azure Batch 是否支持异步/等待?
更多问题
- 如果 Azure 不支持异步/等待,那么解决此问题的更好方法是什么?即使用作业管理器并启动更多节点。
谁能解释一下为什么 MoreLinq 的 Batch 在 Azure Batch 中不起作用?
以下是受影响的代码片段:
List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
var customResults = new List<CustomApiResult>();
foreach (var personIdsBatch in personIds.Batch(100))
{
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
}
据我了解,personIds.Batch(100)
只是将 personIds
分批放入大小为 (100) 的桶中。
//method1
foreach (var personIdsBatch in personIds.Batch(100))
{
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
}
//method2
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIds));
上述两种方法都会按顺序为每个人调用您的自定义 API,而 method1
添加了处理相同任务的额外逻辑。
Does Azure Batch support async / await?
根据你的代码,我定义了IDotNetActivity
实现如下,你可以参考一下:
public class MyDotNetActivity : IDotNetActivity
{
public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
return ExecuteAsync(linkedServices, datasets, activity, logger).Result;
}
async Task<IDictionary<string, string>> ExecuteAsync(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
List<int> personIds = await GetPersonIds("{clientAddress}", "{clientUsername}", "{clientPassword}");
var tasks = new List<Task<List<CustomApiResult>>>();
foreach (var personIdsBatch in personIds.Batch(100))
{
tasks.AddRange(GetCustomResultsByBatch("{address}", "{username}", "{password}", "{personIdsBatch}"));
}
var taskResults = await Task.WhenAll(tasks);
List<CustomApiResult> customResults = taskResults.SelectMany(r=>r).ToList();
//process the custom api results
return new Dictionary<string, string>();
}
async Task<List<CustomApiResult>> GetCustomResultsByBatch(string address, string username, string password, IEnumerable<int> personIdsBatch)
{
//Get Custom Results By Batch
return new List<CustomApiResult>();
}
async Task<List<int>> GetPersonIds(string clientAddress, string clientUsername, string clientPassword)
{
//load a list of people from a file in Azure Blob storage
return new List<int>();
}
}
此外,我假设您可以利用 Parallel.ForEach 来并行执行您的同步作业:
List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
var customResults = new List<CustomApiResult>();
Parallel.ForEach(personIds.Batch(100),
new ParallelOptions()
{
MaxDegreeOfParallelism=5
},
(personIdsBatch) =>
{
var results = GetCustomResultsByBatch(address, username, password, personIdsBatch);
lock (customResults)
{
customResults.AddRange(results);
}
});
背景
我稍微简化了这个场景,但这是普遍问题。
我正在使用 Azure 数据工厂将自定义 API 中的数据提取到 Azure 数据仓库中的 table 中。我正在使用 IDotNetActivity 来 运行 调用 API 的 C# 代码并将数据加载到数据仓库中。 Azure Batch 中的 activity 运行。
在 activity 本身中,在我调用自定义 API 之前,我从 Azure Blob 存储中的文件加载人员列表。 然后,我为文件中的每个人调用自定义 API。 这些调用是依次进行的。 问题是这种方法花费的时间太长。 文件大小可能会增加,因此花费的时间只会变得更糟。
我尝试提高性能的事情
- 使 API 调用异步并以 3 个为一组调用它们。St运行gely 这个 运行 更慢。看起来批处理不能很好地处理异步/等待。
- 我们看到的其他 st运行geness 是 MoreLinq 的批处理命令根本不起作用。我已经检查了源代码: https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs。这使用 yield return 但我不知道为什么这不起作用,即使它与 async / await 问题有关。
主要问题
Azure Batch 是否支持异步/等待?
更多问题
- 如果 Azure 不支持异步/等待,那么解决此问题的更好方法是什么?即使用作业管理器并启动更多节点。
谁能解释一下为什么 MoreLinq 的 Batch 在 Azure Batch 中不起作用? 以下是受影响的代码片段:
List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword); var customResults = new List<CustomApiResult>(); foreach (var personIdsBatch in personIds.Batch(100)) { customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch)); }
据我了解,personIds.Batch(100)
只是将 personIds
分批放入大小为 (100) 的桶中。
//method1
foreach (var personIdsBatch in personIds.Batch(100))
{
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
}
//method2
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIds));
上述两种方法都会按顺序为每个人调用您的自定义 API,而 method1
添加了处理相同任务的额外逻辑。
Does Azure Batch support async / await?
根据你的代码,我定义了IDotNetActivity
实现如下,你可以参考一下:
public class MyDotNetActivity : IDotNetActivity
{
public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
return ExecuteAsync(linkedServices, datasets, activity, logger).Result;
}
async Task<IDictionary<string, string>> ExecuteAsync(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
List<int> personIds = await GetPersonIds("{clientAddress}", "{clientUsername}", "{clientPassword}");
var tasks = new List<Task<List<CustomApiResult>>>();
foreach (var personIdsBatch in personIds.Batch(100))
{
tasks.AddRange(GetCustomResultsByBatch("{address}", "{username}", "{password}", "{personIdsBatch}"));
}
var taskResults = await Task.WhenAll(tasks);
List<CustomApiResult> customResults = taskResults.SelectMany(r=>r).ToList();
//process the custom api results
return new Dictionary<string, string>();
}
async Task<List<CustomApiResult>> GetCustomResultsByBatch(string address, string username, string password, IEnumerable<int> personIdsBatch)
{
//Get Custom Results By Batch
return new List<CustomApiResult>();
}
async Task<List<int>> GetPersonIds(string clientAddress, string clientUsername, string clientPassword)
{
//load a list of people from a file in Azure Blob storage
return new List<int>();
}
}
此外,我假设您可以利用 Parallel.ForEach 来并行执行您的同步作业:
List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
var customResults = new List<CustomApiResult>();
Parallel.ForEach(personIds.Batch(100),
new ParallelOptions()
{
MaxDegreeOfParallelism=5
},
(personIdsBatch) =>
{
var results = GetCustomResultsByBatch(address, username, password, personIdsBatch);
lock (customResults)
{
customResults.AddRange(results);
}
});