集成 Azure 数据工厂和气流

Integrating azure data factory and airflow

我的最终目标是 运行 使用 Airflow 的 Azure 数据工厂 (ADF) 管道。我当前的设置是一个 docker 文件,其中包含 python 所需的包,例如 azure 数据提供程序和来自 apache airflow 的 helm chart。我有一个带有 celery 执行器的自定义 values.yaml,我每次都在本地升级到 运行 气流。

到目前为止这部分是成功的。

python 包提供了一个天蓝色数据工厂选项,就像我想要的那样,但我无法验证这些。我使用 python 代码测试了相同的凭据(资源组、客户端 ID、客户端机密、租户 ID、订阅 ID),它是有效的。

这在 Airflow 上不起作用,我看到了这个错误

*** Trying to get logs (last 100 lines) from worker pod azuredatafactoryrunpipeline.3f394072d09e4c6e8d59566776f18b78 ***
*** Unable to fetch logs from worker pod azuredatafactoryrunpipeline.3f394072d09e4c6e8d59566776f18b78 ***
(404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '4a506d8b-9bac-44d8-8a7c-ee01810e478f', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Mon, 07 Feb 2022 22:05:52 GMT', 'Content-Length': '288'})
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"azuredatafactoryrunpipeline.3f394072d09e4c6e8d59566776f18b78\" not found","reason":"NotFound","details":{"name":"azuredatafactoryrunpipeline.3f394072d09e4c6e8d59566776f18b78","kind":"pods"},"code":404}\n'

我该如何调试?

有没有办法从 Airflow 测试 ADF 连接?

Airflow 有 AzureDataFactoryRunPipelineOperator 用于在数据工厂内执行管道(参见 docs

您还可以添加传感器以等待管道完成,然后再继续执行您的 Airflow 工作流程。

用法示例:

from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor

run_pipeline1 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline",
    pipeline_name="pipeline",
    parameters={"myParam": "value"},
)

pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor",
    run_id=run_pipeline.output["run_id"],
)
run_pipeline2 >> pipeline_run_sensor

要在 Airflow 中定义 ADF 连接,请检查此 doc