功能中的气流 DAG?
Airflow DAG in functions?
我在 $AIRFLOW_HOME/dags
工作。我创建了以下文件:
- common
|- __init__.py # empty
|- common.py # common code
- foo_v1.py # dag instanciation
在common.py
中:
default_args = ...
def create_dag(project, version):
dag_id = project + '_' + version
dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
print('creating DAG ' + dag_id)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
在foo_v1.py
中:
from common.common import create_dag
create_dag('foo', 'v1')
使用 python 测试脚本时,它看起来正常:
$ python foo_v1.py
[2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0
然后我在本地启动网络服务器和调度程序。我的问题是我没有看到任何 ID 为 foo_v1
的 DAG。没有正在创建 pyc
文件。做错了什么?为什么foo_v1.py
中的代码没有被执行?
您需要将 dag 分配给模块中的导出变量。如果 dag 不在模块中 __dict__
airflow 的 DagBag 处理器将不会拾取它。
在此处查看来源:https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L428
要被 Airflow 找到,create_dag()
返回的 DAG 对象必须位于 foo_v1.py
模块的全局命名空间中。将 DAG 放置在全局命名空间中的一种方法是简单地将其分配给模块级变量:
from common.common import create_dag
dag = create_dag('foo', 'v1')
另一种方法是使用 globals()
:
更新全局命名空间
globals()['foo_v1'] = create_dag('foo', 'v1')
后者可能看起来有点矫枉过正,但它对 creating multiple DAGs dynamically 很有用。例如,在 for 循环中:
for i in range(10):
globals()[f'foo_v{i}'] = create_dag('foo', f'v{i}')
注意: 放置在 $AIRFLOW_HOME/dags
中的任何 *.py
文件(即使在子目录中,如您的情况下的 common
)将被Airflow解析。如果你不想这样,你可以使用 .airflowignore
or packaged DAGs.
如 here 中所述,您 必须 return 创建 dag 后!
default_args = ...
def create_dag(project, version):
dag_id = project + '_' + version
dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
print('creating DAG ' + dag_id)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
return dag # Add this line to your code!
我在 $AIRFLOW_HOME/dags
工作。我创建了以下文件:
- common
|- __init__.py # empty
|- common.py # common code
- foo_v1.py # dag instanciation
在common.py
中:
default_args = ...
def create_dag(project, version):
dag_id = project + '_' + version
dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
print('creating DAG ' + dag_id)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
在foo_v1.py
中:
from common.common import create_dag
create_dag('foo', 'v1')
使用 python 测试脚本时,它看起来正常:
$ python foo_v1.py
[2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0
然后我在本地启动网络服务器和调度程序。我的问题是我没有看到任何 ID 为 foo_v1
的 DAG。没有正在创建 pyc
文件。做错了什么?为什么foo_v1.py
中的代码没有被执行?
您需要将 dag 分配给模块中的导出变量。如果 dag 不在模块中 __dict__
airflow 的 DagBag 处理器将不会拾取它。
在此处查看来源:https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L428
要被 Airflow 找到,create_dag()
返回的 DAG 对象必须位于 foo_v1.py
模块的全局命名空间中。将 DAG 放置在全局命名空间中的一种方法是简单地将其分配给模块级变量:
from common.common import create_dag
dag = create_dag('foo', 'v1')
另一种方法是使用 globals()
:
globals()['foo_v1'] = create_dag('foo', 'v1')
后者可能看起来有点矫枉过正,但它对 creating multiple DAGs dynamically 很有用。例如,在 for 循环中:
for i in range(10):
globals()[f'foo_v{i}'] = create_dag('foo', f'v{i}')
注意: 放置在 $AIRFLOW_HOME/dags
中的任何 *.py
文件(即使在子目录中,如您的情况下的 common
)将被Airflow解析。如果你不想这样,你可以使用 .airflowignore
or packaged DAGs.
如 here 中所述,您 必须 return 创建 dag 后!
default_args = ...
def create_dag(project, version):
dag_id = project + '_' + version
dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
print('creating DAG ' + dag_id)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
return dag # Add this line to your code!