在长 运行 天蓝色数据工厂管道上放置警报的方法
Method to put alerts on long running azure data factory pipeline
我有一些数据工厂管道,在将数据从 blob 复制到 SQL 时,有时可能 运行 超过 2 小时。时间段是可变的,但我想 notified/alerted 当任何管道 运行 超过 2 小时时。
有哪些可行的方法?
到目前为止我尝试过的:
- 探索了我可以设置警报规则的 adf 指标。但似乎有 none 谈论活动 运行 的持续时间。
- 我希望获得我们在 adf.azure.com 的监视器选项卡上看到的管道的持续时间值,并使用它来发出某种警报。
- 我也在想,如果我可以获得管道开始时间,那么也许我可以从当前时间计算出总 运行 时间,并在此基础上设置一些警报。
一种变通的方法是在您的 SQL 数据库中记录一个时间戳作为管道的第一步,然后通过监视数据库引擎中的会话来跟踪负载.
我们做这样的事情来跟踪 运行 管道和管理执行并发性。我发现 Logic Apps 和 Azure Functions 是创建此类解决方案的绝佳工具。以下是我们如何处理此问题的粗略概述:
- 一组利用
Microsoft.Azure.Management.DataFactory SDK。相关代码在这个post.
的底部
- SQL 服务器 table 中的管道执行日志。 table 包括 PipelineId
和状态,以及一些其他信息。每当您创建管道时,您都需要插入此 table。我们使用单独的逻辑应用调用 AF 以使用下面代码中的 "RunPipelineAsync" 方法执行管道,捕获新的 PipelineId (RunId),并将其发送到存储过程以记录 PipelineId。
- 逻辑应用程序 运行 重复触发(每 3 分钟)
a) 调用一个存储过程来轮询 table(上面的#2)和 returns 所有状态为 "InProgress" 的管道;
b) foreach 遍历返回的列表并调用 AF(上面的#1),使用下面代码中的 "GetPipelineInfoAsync" 方法检查管道的当前状态;
和
c) 调用另一个存储过程来更新 table.
中的状态
您可以做类似的事情,并使用 "DurationInMS" 根据状态 = "InProgress" 和总 运行 时间> {所需警报阈值}生成适当的操作。
这是我使用的 DataFactoryHelper class:
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest;
using Microsoft.Azure.Management.ResourceManager;
using Microsoft.Azure.Management.DataFactory;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace AzureUtilities.DataFactory
{
public class DataFactoryHelper
{
private ClientCredential Credentials { get; set; }
private string KeyVaultUrl { get; set; }
private string TenantId { get; set; }
private string SubscriptionId { get; set; }
private DataFactoryManagementClient _client = null;
private DataFactoryManagementClient Client
{
get {
if (_client == null)
{
var context = new AuthenticationContext("https://login.windows.net/" + TenantId);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", Credentials).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
_client = new DataFactoryManagementClient(cred) { SubscriptionId = SubscriptionId };
}
return _client;
}
}
public DataFactoryHelper(string servicePrincipalId, string servicePrincipalKey, string tenantId, string subscriptionId)
{
Credentials = new ClientCredential(servicePrincipalId, servicePrincipalKey);
TenantId = tenantId;
SubscriptionId = subscriptionId;
}
public async Task<string> RunPipelineAsync(string resourceGroupName,
string dataFactoryName,
string pipelineName,
Dictionary<string, object> parameters = null,
Dictionary<string, List<string>> customHeaders = null)
{
var runResponse = await Client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroupName, dataFactoryName, pipelineName, parameters: parameters , customHeaders: customHeaders);
return runResponse.Body.RunId;
}
public async Task<object> GetPipelineInfoAsync(string resourceGroup, string dataFactory, string runId)
{
var info = await Client.PipelineRuns.GetAsync(resourceGroup, dataFactory, runId);
return new
{
RunId = info.RunId,
PipelineName = info.PipelineName,
InvokedBy = info.InvokedBy.Name,
LastUpdated = info.LastUpdated,
RunStart = info.RunStart,
RunEnd = info.RunEnd,
DurationInMs = info.DurationInMs,
Status = info.Status,
Message = info.Message
};
}
}
}
我有一些数据工厂管道,在将数据从 blob 复制到 SQL 时,有时可能 运行 超过 2 小时。时间段是可变的,但我想 notified/alerted 当任何管道 运行 超过 2 小时时。
有哪些可行的方法?
到目前为止我尝试过的:
- 探索了我可以设置警报规则的 adf 指标。但似乎有 none 谈论活动 运行 的持续时间。
- 我希望获得我们在 adf.azure.com 的监视器选项卡上看到的管道的持续时间值,并使用它来发出某种警报。
- 我也在想,如果我可以获得管道开始时间,那么也许我可以从当前时间计算出总 运行 时间,并在此基础上设置一些警报。
一种变通的方法是在您的 SQL 数据库中记录一个时间戳作为管道的第一步,然后通过监视数据库引擎中的会话来跟踪负载.
我们做这样的事情来跟踪 运行 管道和管理执行并发性。我发现 Logic Apps 和 Azure Functions 是创建此类解决方案的绝佳工具。以下是我们如何处理此问题的粗略概述:
- 一组利用 Microsoft.Azure.Management.DataFactory SDK。相关代码在这个post. 的底部
- SQL 服务器 table 中的管道执行日志。 table 包括 PipelineId 和状态,以及一些其他信息。每当您创建管道时,您都需要插入此 table。我们使用单独的逻辑应用调用 AF 以使用下面代码中的 "RunPipelineAsync" 方法执行管道,捕获新的 PipelineId (RunId),并将其发送到存储过程以记录 PipelineId。
- 逻辑应用程序 运行 重复触发(每 3 分钟) a) 调用一个存储过程来轮询 table(上面的#2)和 returns 所有状态为 "InProgress" 的管道; b) foreach 遍历返回的列表并调用 AF(上面的#1),使用下面代码中的 "GetPipelineInfoAsync" 方法检查管道的当前状态; 和 c) 调用另一个存储过程来更新 table. 中的状态
您可以做类似的事情,并使用 "DurationInMS" 根据状态 = "InProgress" 和总 运行 时间> {所需警报阈值}生成适当的操作。
这是我使用的 DataFactoryHelper class:
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest;
using Microsoft.Azure.Management.ResourceManager;
using Microsoft.Azure.Management.DataFactory;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace AzureUtilities.DataFactory
{
public class DataFactoryHelper
{
private ClientCredential Credentials { get; set; }
private string KeyVaultUrl { get; set; }
private string TenantId { get; set; }
private string SubscriptionId { get; set; }
private DataFactoryManagementClient _client = null;
private DataFactoryManagementClient Client
{
get {
if (_client == null)
{
var context = new AuthenticationContext("https://login.windows.net/" + TenantId);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", Credentials).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
_client = new DataFactoryManagementClient(cred) { SubscriptionId = SubscriptionId };
}
return _client;
}
}
public DataFactoryHelper(string servicePrincipalId, string servicePrincipalKey, string tenantId, string subscriptionId)
{
Credentials = new ClientCredential(servicePrincipalId, servicePrincipalKey);
TenantId = tenantId;
SubscriptionId = subscriptionId;
}
public async Task<string> RunPipelineAsync(string resourceGroupName,
string dataFactoryName,
string pipelineName,
Dictionary<string, object> parameters = null,
Dictionary<string, List<string>> customHeaders = null)
{
var runResponse = await Client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroupName, dataFactoryName, pipelineName, parameters: parameters , customHeaders: customHeaders);
return runResponse.Body.RunId;
}
public async Task<object> GetPipelineInfoAsync(string resourceGroup, string dataFactory, string runId)
{
var info = await Client.PipelineRuns.GetAsync(resourceGroup, dataFactory, runId);
return new
{
RunId = info.RunId,
PipelineName = info.PipelineName,
InvokedBy = info.InvokedBy.Name,
LastUpdated = info.LastUpdated,
RunStart = info.RunStart,
RunEnd = info.RunEnd,
DurationInMs = info.DurationInMs,
Status = info.Status,
Message = info.Message
};
}
}
}