当 catchup 为 True 时,Airflow 任务设置为“no_status”
Airflow tasks set to `no_status` when catchup is True
我正在尝试配置一系列 Airflow 任务来回填一些数据 (catchup=True)。 DAG 部署并取消暂停后,第一个作业成功运行,但所有后续运行的任务都设置为 no_status
,并且它们永远不会启动。
我尝试了重命名 DAG、重新启动 Airflow 服务器和调度程序、清除旧日志的变体,但我在这里没有取得任何进展。
想法?
DAG代码:
default_args = {
"owner": "me",
"retries": 2,
"retry_delay": timedelta(minutes=2),
"sla": timedelta(hours=1),
"start_date": "2021-01-01T00:00",
}
dag = DAG(
catchup=True,
dag_id="ingest_dag_testing_6",
dagrun_timeout=timedelta(hours=1),
default_args=default_args,
max_active_runs=1,
schedule_interval="30 * * * *",
)
DATA_SOURCE_TYPES = [
{
"target_name": "task_a",
"children": [
{
"target_name": "subtask_a1",
},
{
"target_name": "subtask_a2",
},
],
}
]
with dag:
for dst in DATA_SOURCE_TYPES:
sub_ingest_tasks = []
ingest_task = PythonOperator(
task_id=f"ingest_{dst.get('target_name')}",
python_callable=run_ingestion,
op_args=[logger, exe_date, dst],
)
if dst.get("children"):
for sdst in dst.get("children"):
sub_ingest_tasks.append(
PythonOperator(
task_id=f"ingest_{sdst.get('target_name')}",
python_callable=run_ingestion,
op_args=[logger, exe_date, sdst],
)
)
ingest_task >> sub_ingest_tasks
你的代码执行得很好。
我根据您的代码创建了一个可运行的示例(因为它缺少 imports/callables):
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta
def run_ingestion(**context):
print("Hello World")
default_args = {
"owner": "me",
"retries": 2,
"retry_delay": timedelta(minutes=2),
"sla": timedelta(hours=1),
"start_date": "2021-01-01T00:00",
}
dag = DAG(
catchup=True,
dag_id="ingest_dag_testing_6",
dagrun_timeout=timedelta(hours=1),
default_args=default_args,
max_active_runs=1,
schedule_interval="30 * * * *",
)
DATA_SOURCE_TYPES = [
{
"target_name": "task_a",
"children": [
{
"target_name": "subtask_a1",
},
{
"target_name": "subtask_a2",
},
],
}
]
with dag:
for dst in DATA_SOURCE_TYPES:
sub_ingest_tasks = []
ingest_task = PythonOperator(
task_id=f"ingest_{dst.get('target_name')}",
python_callable=run_ingestion,
#op_args=[logger, exe_date, dst],
)
if dst.get("children"):
for sdst in dst.get("children"):
sub_ingest_tasks.append(
PythonOperator(
task_id=f"ingest_{sdst.get('target_name')}",
python_callable=run_ingestion,
#op_args=[logger, exe_date, sdst],
)
)
ingest_task >> sub_ingest_tasks
您可以看到它工作正常:
如果您是 运行 旧 Airflow 版本,更改 dag_id
可能会解决问题。可能是有一些与此 dag_id 相关的数据库记录的旧痕迹未正确清理。调度程序在以后的版本中进行了重大重构。
如果上述方法没有帮助,可能唯一的解决方案是升级到最新的 Airflow 版本,因为它可能是旧版本中的一个错误,该错误已在此过程中得到修复(因为您共享的代码不会重现该问题您在最新的 Airflow 版本中描述。
我正在尝试配置一系列 Airflow 任务来回填一些数据 (catchup=True)。 DAG 部署并取消暂停后,第一个作业成功运行,但所有后续运行的任务都设置为 no_status
,并且它们永远不会启动。
我尝试了重命名 DAG、重新启动 Airflow 服务器和调度程序、清除旧日志的变体,但我在这里没有取得任何进展。
想法?
DAG代码:
default_args = {
"owner": "me",
"retries": 2,
"retry_delay": timedelta(minutes=2),
"sla": timedelta(hours=1),
"start_date": "2021-01-01T00:00",
}
dag = DAG(
catchup=True,
dag_id="ingest_dag_testing_6",
dagrun_timeout=timedelta(hours=1),
default_args=default_args,
max_active_runs=1,
schedule_interval="30 * * * *",
)
DATA_SOURCE_TYPES = [
{
"target_name": "task_a",
"children": [
{
"target_name": "subtask_a1",
},
{
"target_name": "subtask_a2",
},
],
}
]
with dag:
for dst in DATA_SOURCE_TYPES:
sub_ingest_tasks = []
ingest_task = PythonOperator(
task_id=f"ingest_{dst.get('target_name')}",
python_callable=run_ingestion,
op_args=[logger, exe_date, dst],
)
if dst.get("children"):
for sdst in dst.get("children"):
sub_ingest_tasks.append(
PythonOperator(
task_id=f"ingest_{sdst.get('target_name')}",
python_callable=run_ingestion,
op_args=[logger, exe_date, sdst],
)
)
ingest_task >> sub_ingest_tasks
你的代码执行得很好。
我根据您的代码创建了一个可运行的示例(因为它缺少 imports/callables):
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta
def run_ingestion(**context):
print("Hello World")
default_args = {
"owner": "me",
"retries": 2,
"retry_delay": timedelta(minutes=2),
"sla": timedelta(hours=1),
"start_date": "2021-01-01T00:00",
}
dag = DAG(
catchup=True,
dag_id="ingest_dag_testing_6",
dagrun_timeout=timedelta(hours=1),
default_args=default_args,
max_active_runs=1,
schedule_interval="30 * * * *",
)
DATA_SOURCE_TYPES = [
{
"target_name": "task_a",
"children": [
{
"target_name": "subtask_a1",
},
{
"target_name": "subtask_a2",
},
],
}
]
with dag:
for dst in DATA_SOURCE_TYPES:
sub_ingest_tasks = []
ingest_task = PythonOperator(
task_id=f"ingest_{dst.get('target_name')}",
python_callable=run_ingestion,
#op_args=[logger, exe_date, dst],
)
if dst.get("children"):
for sdst in dst.get("children"):
sub_ingest_tasks.append(
PythonOperator(
task_id=f"ingest_{sdst.get('target_name')}",
python_callable=run_ingestion,
#op_args=[logger, exe_date, sdst],
)
)
ingest_task >> sub_ingest_tasks
您可以看到它工作正常:
如果您是 运行 旧 Airflow 版本,更改 dag_id
可能会解决问题。可能是有一些与此 dag_id 相关的数据库记录的旧痕迹未正确清理。调度程序在以后的版本中进行了重大重构。
如果上述方法没有帮助,可能唯一的解决方案是升级到最新的 Airflow 版本,因为它可能是旧版本中的一个错误,该错误已在此过程中得到修复(因为您共享的代码不会重现该问题您在最新的 Airflow 版本中描述。