无法 运行 Airflow 中的任务
Can't able to run a Task in Airflow
每当我尝试 运行 我的 DAG 时,它都会获得成功状态,但它无法 运行 任务。
虽然 运行ning 第一次完美运行
trigger_controller_dag.py:
def conditionally_trigger(context, dag_run_obj):
c_p = context['params']['condition_param']
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
dag = DAG(
dag_id='example_trigger_controller_dag',
default_args={
"owner": "airflow",
"start_date": datetime.utcnow(),
},
schedule_interval='@once',
)
trigger = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id="example_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello World'},
dag=dag,
)
trigger_target_dag.py:
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
}
dag = DAG(
dag_id='example_trigger_target_dag',
default_args=args,
schedule_interval=None,
)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
在 运行 处理 DAG 时出现以下错误,
dependency 'Task Instance State' FAILED:任务处于 'success' 状态,这不是执行的有效状态。必须清除任务才能运行
而不是 schedule_interval='@once'
试试 schedule_interval=None
.
@once
表示只会运行一次。
您的 start_date
两个 DAGS 中的变量都是动态的,可能会导致您的问题 'start_date': datetime.utcnow()
。不建议将它们设置为动态启动,这会导致错误。
尝试将其设置为静态开始日期,例如 'start_date': datetime(2019, 5, 29) #year month day
We recommend against using dynamic values as start_date, especially
datetime.now() as it can be quite confusing. The task is triggered
once the period closes, and in theory an @hourly DAG would never get
to an hour after now as now() moves along.
关于此的另一个 SO 问题:why dynamic start causes issues
每当我尝试 运行 我的 DAG 时,它都会获得成功状态,但它无法 运行 任务。 虽然 运行ning 第一次完美运行
trigger_controller_dag.py:
def conditionally_trigger(context, dag_run_obj):
c_p = context['params']['condition_param']
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
dag = DAG(
dag_id='example_trigger_controller_dag',
default_args={
"owner": "airflow",
"start_date": datetime.utcnow(),
},
schedule_interval='@once',
)
trigger = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id="example_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello World'},
dag=dag,
)
trigger_target_dag.py:
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
}
dag = DAG(
dag_id='example_trigger_target_dag',
default_args=args,
schedule_interval=None,
)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
在 运行 处理 DAG 时出现以下错误, dependency 'Task Instance State' FAILED:任务处于 'success' 状态,这不是执行的有效状态。必须清除任务才能运行
而不是 schedule_interval='@once'
试试 schedule_interval=None
.
@once
表示只会运行一次。
您的 start_date
两个 DAGS 中的变量都是动态的,可能会导致您的问题 'start_date': datetime.utcnow()
。不建议将它们设置为动态启动,这会导致错误。
尝试将其设置为静态开始日期,例如 'start_date': datetime(2019, 5, 29) #year month day
We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.
关于此的另一个 SO 问题:why dynamic start causes issues