Airflow dag_id 不存在或解析失败
Airflow dag_id did not exist or it failed to parse
目前我正在学习如何使用 Apache Airflow 并尝试创建一个像这样的简单 DAG 脚本
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 0 * * *',
start_date=datetime(2020, 5, 23), catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator
i 运行 那些使用 web 服务器的 DAG 运行 甚至成功地检查了日志
[2020-05-23 20:43:53,411] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: hello_world.hello_task 2020-05-23T13:42:17.463955+00:00 [queued]>
[2020-05-23 20:43:53,431] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: hello_world.hello_task 2020-05-23T13:42:17.463955+00:00 [queued]>
[2020-05-23 20:43:53,432] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-05-23 20:43:53,432] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-05-23 20:43:53,432] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-05-23 20:43:53,448] {taskinstance.py:900} INFO - Executing <Task(PythonOperator): hello_task> on 2020-05-23T13:42:17.463955+00:00
[2020-05-23 20:43:53,477] {standard_task_runner.py:53} INFO - Started process 7442 to run task
[2020-05-23 20:43:53,685] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: hello_world.hello_task 2020-05-23T13:42:17.463955+00:00 [running]> LAPTOP-9BCTKM5O.localdomain
[2020-05-23 20:43:53,715] {python_operator.py:114} INFO - Done. Returned value was: Hello world!
[2020-05-23 20:43:53,738] {taskinstance.py:1052} INFO - Marking task as SUCCESS.dag_id=hello_world, task_id=hello_task, execution_date=20200523T134217, start_date=20200523T134353, end_date=20200523T134353
[2020-05-23 20:44:03,372] {logging_mixin.py:112} INFO - [2020-05-23 20:44:03,372] {local_task_job.py:103} INFO - Task exited with return code 0
但是当我尝试使用此命令运行测试单个任务时
airflow test dags/main.py hello_task 2020-05-23
显示此错误
airflow.exceptions.AirflowException: dag_id could not be found: dags/main.py. Either the dag did not exist or it failed to parse.
我哪里错了?
你的 airflow test 命令有点错误,而不是给出 dag 的路径,dags/main.py,您需要输入 dag_id 本身,即 hello_world 查看您的代码。
所以试试这个:
airflow test hello_world hello_task 2020-05-23
你应该得到类似这样的输出:)
airflow@940836ce7da4:/opt/airflow$ airflow test hello_world hello_task 2020-05-23
[2020-05-23 14:18:51,144] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-05-23 14:18:51,145] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags
[2020-05-23 14:18:51,190] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: hello_world.hello_task 2020-05-23T00:00:00+00:00 [None]>
[2020-05-23 14:18:51,203] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: hello_world.hello_task 2020-05-23T00:00:00+00:00 [None]>
[2020-05-23 14:18:51,203] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-05-23 14:18:51,203] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-05-23 14:18:51,203] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-05-23 14:18:51,204] {taskinstance.py:900} INFO - Executing <Task(PythonOperator): hello_task> on 2020-05-23T00:00:00+00:00
[2020-05-23 14:18:51,234] {python_operator.py:114} INFO - Done. Returned value was: Hello world!
[2020-05-23 14:18:51,249] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=hello_world, task_id=hello_task, execution_date=20200523T000000, start_date=20200523T141851, end_date=20200523T141851
2.0之后;
airflow tasks test dag_id task_id date
目前我正在学习如何使用 Apache Airflow 并尝试创建一个像这样的简单 DAG 脚本
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 0 * * *',
start_date=datetime(2020, 5, 23), catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator
i 运行 那些使用 web 服务器的 DAG 运行 甚至成功地检查了日志
[2020-05-23 20:43:53,411] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: hello_world.hello_task 2020-05-23T13:42:17.463955+00:00 [queued]>
[2020-05-23 20:43:53,431] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: hello_world.hello_task 2020-05-23T13:42:17.463955+00:00 [queued]>
[2020-05-23 20:43:53,432] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-05-23 20:43:53,432] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-05-23 20:43:53,432] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-05-23 20:43:53,448] {taskinstance.py:900} INFO - Executing <Task(PythonOperator): hello_task> on 2020-05-23T13:42:17.463955+00:00
[2020-05-23 20:43:53,477] {standard_task_runner.py:53} INFO - Started process 7442 to run task
[2020-05-23 20:43:53,685] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: hello_world.hello_task 2020-05-23T13:42:17.463955+00:00 [running]> LAPTOP-9BCTKM5O.localdomain
[2020-05-23 20:43:53,715] {python_operator.py:114} INFO - Done. Returned value was: Hello world!
[2020-05-23 20:43:53,738] {taskinstance.py:1052} INFO - Marking task as SUCCESS.dag_id=hello_world, task_id=hello_task, execution_date=20200523T134217, start_date=20200523T134353, end_date=20200523T134353
[2020-05-23 20:44:03,372] {logging_mixin.py:112} INFO - [2020-05-23 20:44:03,372] {local_task_job.py:103} INFO - Task exited with return code 0
但是当我尝试使用此命令运行测试单个任务时
airflow test dags/main.py hello_task 2020-05-23
显示此错误
airflow.exceptions.AirflowException: dag_id could not be found: dags/main.py. Either the dag did not exist or it failed to parse.
我哪里错了?
你的 airflow test 命令有点错误,而不是给出 dag 的路径,dags/main.py,您需要输入 dag_id 本身,即 hello_world 查看您的代码。
所以试试这个:
airflow test hello_world hello_task 2020-05-23
你应该得到类似这样的输出:)
airflow@940836ce7da4:/opt/airflow$ airflow test hello_world hello_task 2020-05-23
[2020-05-23 14:18:51,144] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-05-23 14:18:51,145] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags
[2020-05-23 14:18:51,190] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: hello_world.hello_task 2020-05-23T00:00:00+00:00 [None]>
[2020-05-23 14:18:51,203] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: hello_world.hello_task 2020-05-23T00:00:00+00:00 [None]>
[2020-05-23 14:18:51,203] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-05-23 14:18:51,203] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-05-23 14:18:51,203] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-05-23 14:18:51,204] {taskinstance.py:900} INFO - Executing <Task(PythonOperator): hello_task> on 2020-05-23T00:00:00+00:00
[2020-05-23 14:18:51,234] {python_operator.py:114} INFO - Done. Returned value was: Hello world!
[2020-05-23 14:18:51,249] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=hello_world, task_id=hello_task, execution_date=20200523T000000, start_date=20200523T141851, end_date=20200523T141851
2.0之后;
airflow tasks test dag_id task_id date