Airflow + Docker:路径行为(+Repo)
Airflow + Docker: Path behaviour (+Repo)
我很难理解气流中的路径是如何工作的。我创建了这个存储库,以便很容易理解我的意思:https://github.com/remo2479/airflow_example/blob/master/dags/testdag.py
我根据气流页面上的手册从头开始创建了这个存储库。我刚刚停用了示例 DAG。
正如您在唯一的 DAG (dags/testdag.py) 中看到的那样,DAG 包含两个任务和一个使用打开文件的变量声明。
这两个任务使用存储库中的虚拟 sql 脚本 (dags/testdag/testscript.sql)。有一次我使用 testdag/testscript.sql 作为路径(任务 1),有一次我使用 dags/testdag/testscript.sql (任务 2)。通过连接设置,任务 1 可以工作,而任务 2 不会,因为找不到模板。这就是我希望这两个任务 运行 的方式,因为 dag 在 dags 文件夹中,我们不应该把它放在路径中。
但是当我尝试打开 testscript.sql 并阅读其内容时,我必须将“dags”放在路径中 (dags/testdag/testscript.sql)。为什么在使用 MsSqlOperator 和 open-function 时路径表现不同?
为方便起见,我将整个脚本放在 post:
from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from datetime import datetime
with DAG(
dag_id = "testdag",
schedule_interval="30 6 * * *",
start_date=datetime(2022, 1, 1),
catchup=False) as dag:
# Error because of missing connection - this is how it should be
first_task = MsSqlOperator(
task_id="first_task",
sql="testdag/testscript.sql")
# Error because of template not found
second_task = MsSqlOperator(
task_id="second_task",
sql="dags/testdag/testscript.sql")
# When trying to open the file the path has to contain "dags" in the path - why?
with open("dags/testdag/testscript.sql","r") as file:
f = file.read()
file.close()
first_task
second_task
MsSqlOperator
有 sql
as templated field. This means that Jinja engine will run on the string passed via the sql
parameter. Moreover it has .sql
as templated extension。这意味着操作员知道打开 .sql
文件,读取它的内容并通过 Jinja 引擎传递它,然后再将它提交给 MsSQL 数据库执行。您看到的行为是 Airflow 功能的一部分。您不需要编写代码来从文件中读取查询。 Airflow 会为您做到这一点。 Airflow 只要求您提供查询字符串和连接 - 其余的由 Operator 处理。
那个:
second_task = MsSqlOperator(
task_id="second_task",
sql="dags/testdag/testscript.sql")
抛出模板未找到错误,因为 Airflow 知道在相对于您的 DAG 的路径中查找模板扩展。此路径与您的 DAG 无关。如果您希望此路径可用,请使用 template_searchpath 作为:
with DAG(
...,
template_searchpath=["dags/testdag/"],
) as dag:
那么你的接线员就可以 sql=testscript.sql
至于:
with open("dags/testdag/testscript.sql","r") as file:
f = file.read()
file.close()
这实际上什么都不做。该文件将被打开并从调度程序中读取,因为这是顶级代码。不仅如此 - 这些行将每 30 秒执行一次(默认为 min_file_process_interval,因为 Airflow 会定期扫描您的 .py
文件以搜索 DAG 更新。这也应该回答您为什么需要 dags/
的问题.
使用 template_searchpath
将像@Elad 提到的那样工作,但这是 DAG-specific。
要在不使用 template_searchpath
的情况下在 Airflow 中查找文件,请记住 Airflow 运行的所有内容都从 $AIRFLOW_HOME 目录(即默认情况下的 airflow
或您执行服务来自)。因此,要么从所有导入开始,要么参考与您当前所在的代码文件相关的它们(即 current_dir
来自我之前的回答)。
第一次设置 Airflow 可能很繁琐。
我很难理解气流中的路径是如何工作的。我创建了这个存储库,以便很容易理解我的意思:https://github.com/remo2479/airflow_example/blob/master/dags/testdag.py 我根据气流页面上的手册从头开始创建了这个存储库。我刚刚停用了示例 DAG。
正如您在唯一的 DAG (dags/testdag.py) 中看到的那样,DAG 包含两个任务和一个使用打开文件的变量声明。 这两个任务使用存储库中的虚拟 sql 脚本 (dags/testdag/testscript.sql)。有一次我使用 testdag/testscript.sql 作为路径(任务 1),有一次我使用 dags/testdag/testscript.sql (任务 2)。通过连接设置,任务 1 可以工作,而任务 2 不会,因为找不到模板。这就是我希望这两个任务 运行 的方式,因为 dag 在 dags 文件夹中,我们不应该把它放在路径中。
但是当我尝试打开 testscript.sql 并阅读其内容时,我必须将“dags”放在路径中 (dags/testdag/testscript.sql)。为什么在使用 MsSqlOperator 和 open-function 时路径表现不同?
为方便起见,我将整个脚本放在 post:
from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from datetime import datetime
with DAG(
dag_id = "testdag",
schedule_interval="30 6 * * *",
start_date=datetime(2022, 1, 1),
catchup=False) as dag:
# Error because of missing connection - this is how it should be
first_task = MsSqlOperator(
task_id="first_task",
sql="testdag/testscript.sql")
# Error because of template not found
second_task = MsSqlOperator(
task_id="second_task",
sql="dags/testdag/testscript.sql")
# When trying to open the file the path has to contain "dags" in the path - why?
with open("dags/testdag/testscript.sql","r") as file:
f = file.read()
file.close()
first_task
second_task
MsSqlOperator
有 sql
as templated field. This means that Jinja engine will run on the string passed via the sql
parameter. Moreover it has .sql
as templated extension。这意味着操作员知道打开 .sql
文件,读取它的内容并通过 Jinja 引擎传递它,然后再将它提交给 MsSQL 数据库执行。您看到的行为是 Airflow 功能的一部分。您不需要编写代码来从文件中读取查询。 Airflow 会为您做到这一点。 Airflow 只要求您提供查询字符串和连接 - 其余的由 Operator 处理。
那个:
second_task = MsSqlOperator(
task_id="second_task",
sql="dags/testdag/testscript.sql")
抛出模板未找到错误,因为 Airflow 知道在相对于您的 DAG 的路径中查找模板扩展。此路径与您的 DAG 无关。如果您希望此路径可用,请使用 template_searchpath 作为:
with DAG(
...,
template_searchpath=["dags/testdag/"],
) as dag:
那么你的接线员就可以 sql=testscript.sql
至于:
with open("dags/testdag/testscript.sql","r") as file:
f = file.read()
file.close()
这实际上什么都不做。该文件将被打开并从调度程序中读取,因为这是顶级代码。不仅如此 - 这些行将每 30 秒执行一次(默认为 min_file_process_interval,因为 Airflow 会定期扫描您的 .py
文件以搜索 DAG 更新。这也应该回答您为什么需要 dags/
的问题.
使用 template_searchpath
将像@Elad 提到的那样工作,但这是 DAG-specific。
要在不使用 template_searchpath
的情况下在 Airflow 中查找文件,请记住 Airflow 运行的所有内容都从 $AIRFLOW_HOME 目录(即默认情况下的 airflow
或您执行服务来自)。因此,要么从所有导入开始,要么参考与您当前所在的代码文件相关的它们(即 current_dir
来自我之前的回答)。
第一次设置 Airflow 可能很繁琐。