Airflow - 告诉 DAG 跳过每月 2 号的处理
Airflow - Tell DAG to skip processing the 2nd of each month
我构建了一个简单的 AirFlow 管道来处理传入的历史信息。由于数据的性质,每个月的第 2 天的交易量要大得多。在那些日子里,我想以 10 分钟为增量处理数据:*/10 * 2 * *
剩下的日子,我想正常处理,以2小时为增量'0 */2 * * *'
如何告诉我的 "normal" DAG 跳过处理每个月的 2 号?
DAG 本身非常简单:
dag = DAG(
dag_name,
catchup=True,
default_args=default_args,
schedule_interval=schedule_interval
)
with dag:
historical = HistoricalToS3Operator(
task_id=dag_name + '_extract',
model=HistoricalModel.INVOICES
)
redshift = S3ToRedshiftOperator(
task_id=dag_name + '_load',
load_type='upsert',
type_check=False,
primary_key=primary_key,
distkey=distkey,
sortkey=sortkey,
incremental_key=incremental_key,
data_type='json',
dag=dag
)
historical >> redshift
Airflow 使用 python 库 croniter,这就是使用示例易于解释的原因:
试试这个:
iter = croniter('0 0 1,3-31 * *', base)
In [27]: print(iter.get_next(datetime))
2010-02-26 00:00:00
In [28]: print(iter.get_next(datetime))
2010-02-27 00:00:00
In [29]: print(iter.get_next(datetime))
2010-02-28 00:00:00
In [30]: print(iter.get_next(datetime))
2010-03-01 00:00:00
In [31]: print(iter.get_next(datetime))
2010-03-03 00:00:00
我构建了一个简单的 AirFlow 管道来处理传入的历史信息。由于数据的性质,每个月的第 2 天的交易量要大得多。在那些日子里,我想以 10 分钟为增量处理数据:*/10 * 2 * *
剩下的日子,我想正常处理,以2小时为增量'0 */2 * * *'
如何告诉我的 "normal" DAG 跳过处理每个月的 2 号?
DAG 本身非常简单:
dag = DAG(
dag_name,
catchup=True,
default_args=default_args,
schedule_interval=schedule_interval
)
with dag:
historical = HistoricalToS3Operator(
task_id=dag_name + '_extract',
model=HistoricalModel.INVOICES
)
redshift = S3ToRedshiftOperator(
task_id=dag_name + '_load',
load_type='upsert',
type_check=False,
primary_key=primary_key,
distkey=distkey,
sortkey=sortkey,
incremental_key=incremental_key,
data_type='json',
dag=dag
)
historical >> redshift
Airflow 使用 python 库 croniter,这就是使用示例易于解释的原因:
试试这个:
iter = croniter('0 0 1,3-31 * *', base)
In [27]: print(iter.get_next(datetime))
2010-02-26 00:00:00
In [28]: print(iter.get_next(datetime))
2010-02-27 00:00:00
In [29]: print(iter.get_next(datetime))
2010-02-28 00:00:00
In [30]: print(iter.get_next(datetime))
2010-03-01 00:00:00
In [31]: print(iter.get_next(datetime))
2010-03-03 00:00:00