如果之前的任务执行需要更多时间,气流计划将被跳过
Airflow schedule getting skipped if previous task execution takes more time
我的气流 DAG 中有两个任务。一个触发 API 调用(Http 运算符),另一个使用另一个 api(Http 传感器)继续检查其状态。此 DAG 计划 运行 每小时 10 分钟。但有时一次执行可能需要很长时间才能完成,例如 20 小时。在这种情况下,上一个任务 运行ning 未执行时的所有计划都没有执行。
例如,假设我在 01:10 的工作需要 10 个小时才能完成。计划 02:10、03:10、04:10、... 11:10 等应该 运行 被跳过,只有 12:10 的计划被执行。
我正在使用本地执行器。我正在使用以下脚本 运行ning 气流服务器和调度程序。
start_server.sh
export AIRFLOW_HOME=./airflow_home;
export AIRFLOW_GPL_UNIDECODE=yes;
export AIRFLOW_CONN_REST_API=http://localhost:5000;
export AIRFLOW_CONN_MANAGEMENT_API=http://localhost:8001;
airflow initdb;
airflow webserver -p 7200;
start_scheduler.sh
export AIRFLOW_HOME=./airflow_home;
# Connection string for connecting to REST interface server
export AIRFLOW_CONN_REST_API=http://localhost:5000;
export AIRFLOW_CONN_MANAGEMENT_API=http://localhost:8001;
#export AIRFLOW__SMTP__SMTP_PASSWORD=**********;
airflow scheduler;
my_dag_file.py
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': admin_email_ids,
'email_on_failure': False,
'email_on_retry': False
}
DAG_ID = 'reconciliation_job_pipeline'
MANAGEMENT_RES_API_CONNECTION_CONFIG = 'management_api'
DA_REST_API_CONNECTION_CONFIG = 'rest_api'
recon_schedule = Variable.get('recon_cron_expression',"10 * * * *")
dag = DAG(DAG_ID, max_active_runs=1, default_args=default_args,
schedule_interval=recon_schedule,
catchup=False)
dag.doc_md = __doc__
spark_job_end_point = conf['sip_da']['spark_job_end_point']
fetch_index_record_count_config_key = conf['reconciliation'][
'fetch_index_record_count']
fetch_index_record_count = SparkJobOperator(
job_id_key='fetch_index_record_count_job',
config_key=fetch_index_record_count_config_key,
exec_id_req=False,
dag=dag,
http_conn_id=DA_REST_API_CONNECTION_CONFIG,
task_id='fetch_index_record_count_job',
data={},
method='POST',
endpoint=spark_job_end_point,
headers={
"Content-Type": "application/json"}
)
job_endpoint = conf['sip_da']['job_resource_endpoint']
fetch_index_record_count_status_job = JobStatusSensor(
job_id_key='fetch_index_record_count_job',
http_conn_id=DA_REST_API_CONNECTION_CONFIG,
task_id='fetch_index_record_count_status_job',
endpoint=job_endpoint,
method='GET',
request_params={'required': 'status'},
headers={"Content-Type": "application/json"},
dag=dag,
poke_interval=15
)
fetch_index_record_count>>fetch_index_record_count_status_job
SparkJobOperator
& JobStatusSensor
我的自定义 class 扩展 SimpleHttpOperator
& HttpSensor
.
如果我设置 depends_on_past
true
它会按预期工作吗?。这个选项的另一个问题是状态检查作业有时会失败。但是下一个时间表应该会触发。我怎样才能实现这种行为?
我认为这里的主要讨论点是你设置的是catchup=False
,更详细的可以找到here。所以气流调度器将跳过那些任务执行,你会看到你提到的行为。
这听起来好像如果之前的过程花费的时间比预期的要长,您将需要执行追赶。你可以试试改一下catchup=True
我的气流 DAG 中有两个任务。一个触发 API 调用(Http 运算符),另一个使用另一个 api(Http 传感器)继续检查其状态。此 DAG 计划 运行 每小时 10 分钟。但有时一次执行可能需要很长时间才能完成,例如 20 小时。在这种情况下,上一个任务 运行ning 未执行时的所有计划都没有执行。
例如,假设我在 01:10 的工作需要 10 个小时才能完成。计划 02:10、03:10、04:10、... 11:10 等应该 运行 被跳过,只有 12:10 的计划被执行。
我正在使用本地执行器。我正在使用以下脚本 运行ning 气流服务器和调度程序。
start_server.sh
export AIRFLOW_HOME=./airflow_home;
export AIRFLOW_GPL_UNIDECODE=yes;
export AIRFLOW_CONN_REST_API=http://localhost:5000;
export AIRFLOW_CONN_MANAGEMENT_API=http://localhost:8001;
airflow initdb;
airflow webserver -p 7200;
start_scheduler.sh
export AIRFLOW_HOME=./airflow_home;
# Connection string for connecting to REST interface server
export AIRFLOW_CONN_REST_API=http://localhost:5000;
export AIRFLOW_CONN_MANAGEMENT_API=http://localhost:8001;
#export AIRFLOW__SMTP__SMTP_PASSWORD=**********;
airflow scheduler;
my_dag_file.py
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': admin_email_ids,
'email_on_failure': False,
'email_on_retry': False
}
DAG_ID = 'reconciliation_job_pipeline'
MANAGEMENT_RES_API_CONNECTION_CONFIG = 'management_api'
DA_REST_API_CONNECTION_CONFIG = 'rest_api'
recon_schedule = Variable.get('recon_cron_expression',"10 * * * *")
dag = DAG(DAG_ID, max_active_runs=1, default_args=default_args,
schedule_interval=recon_schedule,
catchup=False)
dag.doc_md = __doc__
spark_job_end_point = conf['sip_da']['spark_job_end_point']
fetch_index_record_count_config_key = conf['reconciliation'][
'fetch_index_record_count']
fetch_index_record_count = SparkJobOperator(
job_id_key='fetch_index_record_count_job',
config_key=fetch_index_record_count_config_key,
exec_id_req=False,
dag=dag,
http_conn_id=DA_REST_API_CONNECTION_CONFIG,
task_id='fetch_index_record_count_job',
data={},
method='POST',
endpoint=spark_job_end_point,
headers={
"Content-Type": "application/json"}
)
job_endpoint = conf['sip_da']['job_resource_endpoint']
fetch_index_record_count_status_job = JobStatusSensor(
job_id_key='fetch_index_record_count_job',
http_conn_id=DA_REST_API_CONNECTION_CONFIG,
task_id='fetch_index_record_count_status_job',
endpoint=job_endpoint,
method='GET',
request_params={'required': 'status'},
headers={"Content-Type": "application/json"},
dag=dag,
poke_interval=15
)
fetch_index_record_count>>fetch_index_record_count_status_job
SparkJobOperator
& JobStatusSensor
我的自定义 class 扩展 SimpleHttpOperator
& HttpSensor
.
如果我设置 depends_on_past
true
它会按预期工作吗?。这个选项的另一个问题是状态检查作业有时会失败。但是下一个时间表应该会触发。我怎样才能实现这种行为?
我认为这里的主要讨论点是你设置的是catchup=False
,更详细的可以找到here。所以气流调度器将跳过那些任务执行,你会看到你提到的行为。
这听起来好像如果之前的过程花费的时间比预期的要长,您将需要执行追赶。你可以试试改一下catchup=True