气流调度程序没有接受工作
Airflow scheduler is not picking up job
我使用以下参数创建了一个新的 Dag:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'catchup': False,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
dag = DAG(
'sample_dag',
default_args=default_args,
description='sample dag',
schedule_interval="44 * * * *")
但是调度程序并没有在时机成熟时拿起 dag。而且我手动触发时运行很好。我在这里遗漏了什么吗?
此外,当 cron 表达式为 "*/5 * * * *"
时,调度程序会抛出错误
CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.
但 cron 表达式看起来不错。
原因是[time the dag runs]
= start_date
+ schedule_interval
。因此,如果您将 start_date
设置为动态的东西,那么 dag 将永远不会执行,因为 start_date
不断增加......好吧......时间。
在堆栈上有解释 here and there is also another question here 也有答案,他们可能比我解释得更好。
您应该将 start_date
更改为静态而不是 datetime.now()
如果您不想在您的 dag 中回填,您需要将 catchup=False
设置为 dag 参数。所以像下面这样:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'sample_dag',
catchup=False,
default_args=default_args,
description='sample dag',
schedule_interval="44 * * * *"
)
我使用以下参数创建了一个新的 Dag:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'catchup': False,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
dag = DAG(
'sample_dag',
default_args=default_args,
description='sample dag',
schedule_interval="44 * * * *")
但是调度程序并没有在时机成熟时拿起 dag。而且我手动触发时运行很好。我在这里遗漏了什么吗?
此外,当 cron 表达式为 "*/5 * * * *"
CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.
但 cron 表达式看起来不错。
原因是[time the dag runs]
= start_date
+ schedule_interval
。因此,如果您将 start_date
设置为动态的东西,那么 dag 将永远不会执行,因为 start_date
不断增加......好吧......时间。
在堆栈上有解释 here and there is also another question here 也有答案,他们可能比我解释得更好。
您应该将 start_date
更改为静态而不是 datetime.now()
如果您不想在您的 dag 中回填,您需要将 catchup=False
设置为 dag 参数。所以像下面这样:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'sample_dag',
catchup=False,
default_args=default_args,
description='sample dag',
schedule_interval="44 * * * *"
)