运行 指定日期的气流作业
running airflow job on specified dates
我需要在每年的指定日期(即 3 月 31 日和 12 月 31 日)将我的 spark v3.0.2 工作安排到 运行。
我正在使用气流进行调度。
如何处理这个用例?
这里有一些选项:
选项 1:
- 创建一个 dag,将日期作为第一步的 PytonOperator,如果不是 Dec31 或 Mar31,则失败。
- 将第一步设为 必需 到 运行 下一步。
选项 2:
- 为每个日期创建一个 运行 每年的 dag。这看起来很糟糕,但可以使用单个 python 文件轻松完成,如下所示:
# Create a dag for an exact date
def createYearlyDagForDate(startdate):
with DAG(startdate=startdate,
task_id=f"createdagdordate_{startdate.strftime(month_%m_day_%d)}"
schedule_interval="@yearly") as dag:
sparkjob = SparkSubmitOperator(...)
return dag
for x in [datetime(2021,12,31), datetime(2021,03,31) ]:
createYearlyDag(x)
这里的技巧是每个 dag 都有一个 task_id
。如果您在 dag 中重复使用 task_id,您将覆盖 dag,并且只会声明一个。
如果您只想 运行 您的工作仅在 3 月 31 日和 12 月 31 日,您可以在 DAG 定义的 schedule_interval
参数中设置 cron expression。
Cron 表达式将是 0 0 31 3,12 *
并且可以在第 3 个月(3 月)和第 12 个月(12 月)的第 31 天午夜转换为 运行。因此你的 DAG 定义应该是:
from airflow import DAG
your_dag = DAG(
dag_id='your_dag_id',
...
schedule_interval='0 0 31 3,12 *',
...
)
对于更复杂的情况,例如运行4月15日和8月23日不能用cron表达式定义,我想你应该这样做
我需要在每年的指定日期(即 3 月 31 日和 12 月 31 日)将我的 spark v3.0.2 工作安排到 运行。 我正在使用气流进行调度。
如何处理这个用例?
这里有一些选项:
选项 1:
- 创建一个 dag,将日期作为第一步的 PytonOperator,如果不是 Dec31 或 Mar31,则失败。
- 将第一步设为 必需 到 运行 下一步。
选项 2:
- 为每个日期创建一个 运行 每年的 dag。这看起来很糟糕,但可以使用单个 python 文件轻松完成,如下所示:
# Create a dag for an exact date
def createYearlyDagForDate(startdate):
with DAG(startdate=startdate,
task_id=f"createdagdordate_{startdate.strftime(month_%m_day_%d)}"
schedule_interval="@yearly") as dag:
sparkjob = SparkSubmitOperator(...)
return dag
for x in [datetime(2021,12,31), datetime(2021,03,31) ]:
createYearlyDag(x)
这里的技巧是每个 dag 都有一个 task_id
。如果您在 dag 中重复使用 task_id,您将覆盖 dag,并且只会声明一个。
如果您只想 运行 您的工作仅在 3 月 31 日和 12 月 31 日,您可以在 DAG 定义的 schedule_interval
参数中设置 cron expression。
Cron 表达式将是 0 0 31 3,12 *
并且可以在第 3 个月(3 月)和第 12 个月(12 月)的第 31 天午夜转换为 运行。因此你的 DAG 定义应该是:
from airflow import DAG
your_dag = DAG(
dag_id='your_dag_id',
...
schedule_interval='0 0 31 3,12 *',
...
)
对于更复杂的情况,例如运行4月15日和8月23日不能用cron表达式定义,我想你应该这样做