自动重新运行失败的 ADF 管道

Rerunning a failed ADF pipeline automatically

我在 Azure 数据工厂中有多个管道从 API 获取数据,然后将其推送到数据湖。如果其中一条管道出现故障,我会收到警报。然后我转到 ADF 实例并手动重新运行失败的管道。我正在尝试想出一种自动重新运行管道的方法,以防它失败。任何建议或指导都会有所帮助。想到了 Azure 逻辑应用程序或 powerautomate,但事实证明其中没有正确的操作来触发失败的管道。

As of now there is no inbuilt method to automate the process of "rerunning from failed activity" in the ADF, but each activity has a Retry option that you should certainly employ. In the pipeline, you may attempt any action as many times as necessary if it fails.

允许触发器指向带有 Execute activity 的新管道,该管道指向带有副本 activity:

的当前 Azure Datafactory

然后选择高级 -> Wait for completion 选项。 执行管道完成后,webhook 操作应包含停止 DW 的逻辑。

如果管道设计可以修改,那么方法可以是

  1. 设置参数pMax_rerun_count(这是为了确保管道不会进入无限循环)
  2. 设置2个变量: (2.a) Pipeline_status 默认值:失败 (2.b) Max_loop_count 默认值:0;这是为了确保管道不会 运行 循环。该值可以在管道 运行 期间设置,并在管道
  3. 中作为参数传递最大允许重试次数(即 pMax_rerun_count)
  4. 所有的活动都应该在 and Untill activity 里面,它将有表达式 or(equals(Pipeline_status,'Success'),equals(pMax_rerun_count ,Max_loop_count)
  5. Untill activity 中的第一个 activity 将是 Set Variable activity 增加变量的值 Max_loop_count 通过 1 .
  6. 最后的 activity insisde Untill activity 将设置变量 activity 将 Pipeline_status 设置为“成功”

这里的目的是 运行 直到管道中的预期活动成功完成之前阻塞内的所有预期活动。 pMax_rerun_count 是为了确保管道不会进入无限循环。

如果所有管道都需要重新运行以防出现故障

,则可以将此设置视为一个框架

我想出了一个简化的方法来 运行 处理失败的管道。我决定使用 Azure 数据工厂 API 和 Azure 逻辑应用程序来解决这个问题。

我 运行 在计划的 运行 时间对应用程序进行逻辑处理,然后使用以下 API 命令:

GET https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelineruns/?api-version=2018-06-01

此 API 查询为我们提供了所有管道 运行。如果我们想将其过滤为失败值,我们可以向其中添加以下正文:

{
  "lastUpdatedAfter": "2018-06-16T00:36:44.3345758Z",
  "lastUpdatedBefore": "2018-06-16T00:49:48.3686473Z",
  "filters": [
    {
      "operand": "status",
      "operator": "Equals",
      "values": [
        "failed"
      ]
    }
  ]
}

获取失败的管道后,我们可以在每个失败的管道上调用以下 API,以重新 运行 它们:

POST https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelines/{pipelineName}/createRun?api-version=2018-06-01

可以使用脚本语言、powerautomate 工作流或 Azure 逻辑应用程序创建此解决方案。