Airflow Python 运算符中的宏
Macros in the Airflow Python operator
我可以在 PythonOperator 中使用宏吗?我尝试了以下操作,但无法呈现宏:
dag = DAG(
'temp',
default_args=default_args,
description='temp dag',
schedule_interval=timedelta(days=1))
def temp_def(a, b, **kwargs):
print '{{ds}}'
print '{{execution_date}}'
print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs))
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = PythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)
宏仅针对模板化字段进行处理。要让 Jinja 处理此字段,请使用您自己的扩展 PythonOperator
。
class MyPythonOperator(PythonOperator):
template_fields = ('templates_dict','op_args')
我将 'templates_dict'
添加到 template_fields
中,因为 PythonOperator
本身已将此字段模板化:
PythonOperator
现在您应该可以在该字段中使用宏了:
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)
在我看来,一种更原生的 Airflow 方法是使用包含的 PythonOperator 并使用 provide_context=True
参数。
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
provide_context=True,
dag=dag)
现在您可以访问可调用对象
的kwargs
中的所有宏、气流元数据和任务参数
def temp_def(**kwargs):
print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))
如果您有一些自定义的 params
与任务相关联,您也可以通过 kwargs['params']
访问它们
我可以在 PythonOperator 中使用宏吗?我尝试了以下操作,但无法呈现宏:
dag = DAG(
'temp',
default_args=default_args,
description='temp dag',
schedule_interval=timedelta(days=1))
def temp_def(a, b, **kwargs):
print '{{ds}}'
print '{{execution_date}}'
print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs))
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = PythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)
宏仅针对模板化字段进行处理。要让 Jinja 处理此字段,请使用您自己的扩展 PythonOperator
。
class MyPythonOperator(PythonOperator):
template_fields = ('templates_dict','op_args')
我将 'templates_dict'
添加到 template_fields
中,因为 PythonOperator
本身已将此字段模板化:
PythonOperator
现在您应该可以在该字段中使用宏了:
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)
在我看来,一种更原生的 Airflow 方法是使用包含的 PythonOperator 并使用 provide_context=True
参数。
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
provide_context=True,
dag=dag)
现在您可以访问可调用对象
的kwargs
中的所有宏、气流元数据和任务参数
def temp_def(**kwargs):
print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))
如果您有一些自定义的 params
与任务相关联,您也可以通过 kwargs['params']