在长 运行 天蓝色数据工厂管道上放置警报的方法

Method to put alerts on long running azure data factory pipeline

我有一些数据工厂管道,在将数据从 blob 复制到 SQL 时,有时可能 运行 超过 2 小时。时间段是可变的,但我想 notified/alerted 当任何管道 运行 超过 2 小时时。

有哪些可行的方法?

到目前为止我尝试过的:

一种变通的方法是在您的 SQL 数据库中记录一个时间戳作为管道的第一步,然后通过监视数据库引擎中的会话来跟踪负载.

我们做这样的事情来跟踪 运行 管道和管理执行并发性。我发现 Logic Apps 和 Azure Functions 是创建此类解决方案的绝佳工具。以下是我们如何处理此问题的粗略概述:

  1. 一组利用 Microsoft.Azure.Management.DataFactory SDK。相关代码在这个post.
  2. 的底部
  3. SQL 服务器 table 中的管道执行日志。 table 包括 PipelineId 和状态,以及一些其他信息。每当您创建管道时,您都需要插入此 table。我们使用单独的逻辑应用调用 AF 以使用下面代码中的 "RunPipelineAsync" 方法执行管道,捕获新的 PipelineId (RunId),并将其发送到存储过程以记录 PipelineId。
  4. 逻辑应用程序 运行 重复触发(每 3 分钟) a) 调用一个存储过程来轮询 table(上面的#2)和 returns 所有状态为 "InProgress" 的管道; b) foreach 遍历返回的列表并调用 AF(上面的#1),使用下面代码中的 "GetPipelineInfoAsync" 方法检查管道的当前状态; 和 c) 调用另一个存储过程来更新 table.
  5. 中的状态

您可以做类似的事情,并使用 "DurationInMS" 根据状态 = "InProgress" 和总 运行 时间> {所需警报阈值}生成适当的操作。

这是我使用的 DataFactoryHelper class:

using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest;
using Microsoft.Azure.Management.ResourceManager;
using Microsoft.Azure.Management.DataFactory;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace AzureUtilities.DataFactory
{
    public class DataFactoryHelper
    {
        private ClientCredential Credentials { get; set; }
        private string KeyVaultUrl { get; set; }
        private string TenantId { get; set; }
        private string SubscriptionId { get; set; }

        private DataFactoryManagementClient _client = null;
        private DataFactoryManagementClient Client
        {
            get {
                if (_client == null)
                {
                    var context = new AuthenticationContext("https://login.windows.net/" + TenantId);
                    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", Credentials).Result;
                    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
                    _client = new DataFactoryManagementClient(cred) { SubscriptionId = SubscriptionId };
                }

                return _client;
            }
        }

        public DataFactoryHelper(string servicePrincipalId, string servicePrincipalKey, string tenantId, string subscriptionId)
        {
            Credentials = new ClientCredential(servicePrincipalId, servicePrincipalKey);
            TenantId = tenantId;
            SubscriptionId = subscriptionId;
        }

        public async Task<string> RunPipelineAsync(string resourceGroupName,
                                                   string dataFactoryName,
                                                   string pipelineName,
                                                   Dictionary<string, object> parameters = null,
                                                   Dictionary<string, List<string>> customHeaders = null)
        {
            var runResponse = await Client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroupName, dataFactoryName, pipelineName, parameters: parameters , customHeaders: customHeaders);
            return runResponse.Body.RunId;
        }

        public async Task<object> GetPipelineInfoAsync(string resourceGroup, string dataFactory, string runId)
        {
            var info = await Client.PipelineRuns.GetAsync(resourceGroup, dataFactory, runId);
            return new
            {
                RunId = info.RunId,
                PipelineName = info.PipelineName,
                InvokedBy = info.InvokedBy.Name,
                LastUpdated = info.LastUpdated,
                RunStart = info.RunStart,
                RunEnd = info.RunEnd,
                DurationInMs = info.DurationInMs,
                Status = info.Status,
                Message = info.Message
            };
        }
    }
}