DAG 在 Airflow 上不可见 UI
DAG is not visible on Airflow UI
这是我在 dags
文件夹中的 dag 文件。
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from work_file import Test
class Main(Test):
def __init__(self):
super(Test, self).__init__()
def create_dag(self):
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG("python_dag", default_args=default_args, schedule_interval='0 * * * *')
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=self.my_func)
dummy_task >> python_task
if __name__ == "__main__":
a = Main()
a.create_dag()
这是我的另一个文件 work_file.py
,它位于同一个 dags
文件夹中。
class Test:
def __init__(self):
pass
def my_func(self):
return "Hello"
目标:-目标是从我的 dag 文件中调用 my_func
。
问题:- UI 上似乎没有错误,但是我的 dag python_dag
不可见。
我的服务器,调度程序也是 运行,我试过重新启动相同但没有任何反应。
我也导入了文件(from work_file import Test
)
提前致谢!
DAG 存在多个问题:
- 运算符未分配给任何 DAG。将
dag=dag
添加到构造函数中。例如,DummyOperator(..., dag=dag)
.
create_dag()
没有 return DAG。添加 return dag
.
- DAG 脚本未作为顶级代码执行。也就是说,模块
__name__
永远不会是 '__main__'
。删除 if __name__ == "__main__":
.
- DAG 对象必须在模块的全局命名空间中。将
create_dag()
的 return 值分配给变量:dag = a.create_dag()
.
这是我在 dags
文件夹中的 dag 文件。
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from work_file import Test
class Main(Test):
def __init__(self):
super(Test, self).__init__()
def create_dag(self):
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG("python_dag", default_args=default_args, schedule_interval='0 * * * *')
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=self.my_func)
dummy_task >> python_task
if __name__ == "__main__":
a = Main()
a.create_dag()
这是我的另一个文件 work_file.py
,它位于同一个 dags
文件夹中。
class Test:
def __init__(self):
pass
def my_func(self):
return "Hello"
目标:-目标是从我的 dag 文件中调用 my_func
。
问题:- UI 上似乎没有错误,但是我的 dag python_dag
不可见。
我的服务器,调度程序也是 运行,我试过重新启动相同但没有任何反应。
我也导入了文件(from work_file import Test
)
提前致谢!
DAG 存在多个问题:
- 运算符未分配给任何 DAG。将
dag=dag
添加到构造函数中。例如,DummyOperator(..., dag=dag)
. create_dag()
没有 return DAG。添加return dag
.- DAG 脚本未作为顶级代码执行。也就是说,模块
__name__
永远不会是'__main__'
。删除if __name__ == "__main__":
. - DAG 对象必须在模块的全局命名空间中。将
create_dag()
的 return 值分配给变量:dag = a.create_dag()
.