Airflow ETL 管道 - 在函数中使用计划日期?
Airflow ETL pipeline - using schedule date in functions?
是否可以在您的 Python 函数中引用 default_args start_date?
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 21),
'email': ['mmm.mm@mmm.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
我的 python 脚本主要使用子进程来发出这条语句:
query = '"SELECT * FROM {}.dbo.{} WHERE row_date = \'{}\'"'.format(database, select_database(database)[table_int],
query_date)
command = 'BCP {} queryout \"{}\" -t, -c -a 10240 -S "server" -T'.format(query, os.path.join(path, filename))
我要运行的任务是使用BCP查询'Select * from table where date = {}'。目前,我的 python 脚本具有日期变量的所有逻辑(默认为昨天)。但是,最好参考 default_arg 并让气流处理日期。
所以,为了简化,我想使用 default_arg start_date 和时间表(每天 运行s)来填写我的 BCP 命令上的变量。这是正确的方法还是我应该在 python 脚本中保留日期逻辑?
这是正确的方法,但您真正需要的是 execution_date
,而不是 start_date
。您可以使用 provide_context=True
参数通过 PythonOperator 中的上下文将 execution_date
作为 'ds'
默认变量。 provide_context=True
参数通过 kwargs
参数传递 Jinja 模板中使用的一组默认变量。您可以在文档的相关部分阅读有关默认变量和 Jinja 模板的更多信息。 https://airflow.incubator.apache.org/code.html#default-variables
https://airflow.incubator.apache.org/concepts.html#jinja-templating
您的代码应如下所示:
def query_db(**kwargs):
#get execution date in format YYYY-MM-DD
query_date = kwargs.get('ds')
#rest of your logic
t_query_db = PythonOperator(
task_id='query_db',
python_callable=query_db,
provide_context=True,
dag=dag)
是否可以在您的 Python 函数中引用 default_args start_date?
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 21),
'email': ['mmm.mm@mmm.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
我的 python 脚本主要使用子进程来发出这条语句:
query = '"SELECT * FROM {}.dbo.{} WHERE row_date = \'{}\'"'.format(database, select_database(database)[table_int],
query_date)
command = 'BCP {} queryout \"{}\" -t, -c -a 10240 -S "server" -T'.format(query, os.path.join(path, filename))
我要运行的任务是使用BCP查询'Select * from table where date = {}'。目前,我的 python 脚本具有日期变量的所有逻辑(默认为昨天)。但是,最好参考 default_arg 并让气流处理日期。
所以,为了简化,我想使用 default_arg start_date 和时间表(每天 运行s)来填写我的 BCP 命令上的变量。这是正确的方法还是我应该在 python 脚本中保留日期逻辑?
这是正确的方法,但您真正需要的是 execution_date
,而不是 start_date
。您可以使用 provide_context=True
参数通过 PythonOperator 中的上下文将 execution_date
作为 'ds'
默认变量。 provide_context=True
参数通过 kwargs
参数传递 Jinja 模板中使用的一组默认变量。您可以在文档的相关部分阅读有关默认变量和 Jinja 模板的更多信息。 https://airflow.incubator.apache.org/code.html#default-variables
https://airflow.incubator.apache.org/concepts.html#jinja-templating
您的代码应如下所示:
def query_db(**kwargs):
#get execution date in format YYYY-MM-DD
query_date = kwargs.get('ds')
#rest of your logic
t_query_db = PythonOperator(
task_id='query_db',
python_callable=query_db,
provide_context=True,
dag=dag)