使自定义 Airflow 宏扩展其他宏
Make custom Airflow macros expand other macros
有没有办法在 Airflow 中创建一个用户定义的宏,它本身是从其他宏计算出来的?
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
'simple',
schedule_interval='0 21 * * *',
user_defined_macros={
'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
},
)
task = BashOperator(
task_id='bash_op',
bash_command='echo "{{ next_execution_date }}"',
dag=dag,
)
这里的用例是向后移植新的 Airflow v1.8 next_execution_date
宏以在 Airflow v1.7 中工作。不幸的是,此模板在没有宏扩展的情况下呈现:
$ airflow render simple bash_op 2017-08-09 21:00:00
# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "{{ dag.following_schedule(execution_date) }}"
user_defined_macros
默认不作为模板处理。如果你想在 user_defined_macro
中保留模板(或者如果你在 params
变量中使用模板),你总是可以手动重新 运行 模板函数:
class DoubleTemplatedBashOperator(BashOperator):
def pre_execute(self, context):
context['ti'].render_templates()
这适用于不引用其他参数或 UDM 的模板。这样,您可以拥有 "two-deep" 个模板。
或者将您的 UDM 直接放在 BashOperator
的命令中(最简单的解决方案):
BashOperator(
task_id='bash_op',
bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
dag=dag,
)
以下是一些解决方案:
1。覆盖 BashOperator
以将一些值添加到上下文
class NextExecutionDateAwareBashOperator(BashOperator):
def render_template(self, attr, content, context):
dag = context['dag']
execution_date = context['execution_date']
context['next_execution_date'] = dag.following_schedule(execution_date)
return super().render_templates(attr, content, context)
# or in python 2:
# return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)
这种方法的好处在于:您可以在自定义运算符中捕获一些重复的代码。
不好的部分:在呈现模板化字段之前,您必须编写一个自定义运算符来向上下文添加值。
2。在用户定义的宏中进行计算
Macros 不一定是值。它们可以是函数。
在你的日子里:
def compute_next_execution_date(dag, execution_date):
return dag.following_schedule(execution_date)
dag = DAG(
'simple',
schedule_interval='0 21 * * *',
user_defined_macros={
'next_execution_date': compute_next_execution_date,
},
)
task = BashOperator(
task_id='bash_op',
bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
dag=dag,
)
好的部分:您可以定义可重用函数来处理运行时可用的值(XCom values、作业实例属性、任务实例属性等),并使您的函数结果可用于呈现模板。
不好的部分(但不是那么烦人):你必须在每个需要的 dag 中导入这样的函数作为用户定义的宏。
3。直接在模板中调用语句
此解决方案是最简单的(如 所述),并且可能是您的情况下的最佳解决方案。
BashOperator(
task_id='bash_op',
bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
dag=dag,
)
非常适合像这样的简单调用。它们是其他一些直接可用的对象,如 macros(如 task
、task_instance
等);甚至还有一些标准模块可用(例如 macros.time
, ...)。
我会投票支持让 Airflow 插件注入您预定义的宏。
使用此方法,您可以在任何 Operator 中使用预定义的宏而无需声明任何内容。
下面是我们正在使用的一些自定义宏。
使用示例:{{ macros.dagtz_next_execution_date(ti) }}
from airflow.plugins_manager import AirflowPlugin
from datetime import datetime, timedelta
from airflow.utils.db import provide_session
from airflow.models import DagRun
import pendulum
@provide_session
def _get_dag_run(ti, session=None):
"""Get DagRun obj of the TaskInstance ti
Args:
ti (TYPE): the TaskInstance object
session (None, optional): Not in use
Returns:
DagRun obj: the DagRun obj of the TaskInstance ti
"""
task = ti.task
dag_run = None
if hasattr(task, 'dag'):
dag_run = (
session.query(DagRun)
.filter_by(
dag_id=task.dag.dag_id,
execution_date=ti.execution_date)
.first()
)
session.expunge_all()
session.commit()
return dag_run
def ds_add_no_dash(ds, days):
"""
Add or subtract days from a YYYYMMDD
:param ds: anchor date in ``YYYYMMDD`` format to add to
:type ds: str
:param days: number of days to add to the ds, you can use negative values
:type days: int
>>> ds_add('20150101', 5)
'20150106'
>>> ds_add('20150106', -5)
'20150101'
"""
ds = datetime.strptime(ds, '%Y%m%d')
if days:
ds = ds + timedelta(days)
return ds.isoformat()[:10].replace('-', '')
def dagtz_execution_date(ti):
"""get the TaskInstance execution date (in DAG timezone) in pendulum obj
Args:
ti (TaskInstance): the TaskInstance object
Returns:
pendulum obj: execution_date in pendulum object (in DAG tz)
"""
execution_date_pdl = pendulum.instance(ti.execution_date)
dagtz_execution_date_pdl = execution_date_pdl.in_timezone(ti.task.dag.timezone)
return dagtz_execution_date_pdl
def dagtz_next_execution_date(ti):
"""get the TaskInstance next execution date (in DAG timezone) in pendulum obj
Args:
ti (TaskInstance): the TaskInstance object
Returns:
pendulum obj: next execution_date in pendulum object (in DAG tz)
"""
# For manually triggered dagruns that aren't run on a schedule, next/previous
# schedule dates don't make sense, and should be set to execution date for
# consistency with how execution_date is set for manually triggered tasks, i.e.
# triggered_date == execution_date.
dag_run = _get_dag_run(ti)
if dag_run and dag_run.external_trigger:
next_execution_date = ti.execution_date
else:
next_execution_date = ti.task.dag.following_schedule(ti.execution_date)
next_execution_date_pdl = pendulum.instance(next_execution_date)
dagtz_next_execution_date_pdl = next_execution_date_pdl.in_timezone(ti.task.dag.timezone)
return dagtz_next_execution_date_pdl
def dagtz_next_ds(ti):
"""get the TaskInstance next execution date (in DAG timezone) in YYYY-MM-DD string
"""
dagtz_next_execution_date_pdl = dagtz_next_execution_date(ti)
return dagtz_next_execution_date_pdl.strftime('%Y-%m-%d')
def dagtz_next_ds_nodash(ti):
"""get the TaskInstance next execution date (in DAG timezone) in YYYYMMDD string
"""
dagtz_next_ds_str = dagtz_next_ds(ti)
return dagtz_next_ds_str.replace('-', '')
def dagtz_prev_execution_date(ti):
"""get the TaskInstance previous execution date (in DAG timezone) in pendulum obj
Args:
ti (TaskInstance): the TaskInstance object
Returns:
pendulum obj: previous execution_date in pendulum object (in DAG tz)
"""
# For manually triggered dagruns that aren't run on a schedule, next/previous
# schedule dates don't make sense, and should be set to execution date for
# consistency with how execution_date is set for manually triggered tasks, i.e.
# triggered_date == execution_date.
dag_run = _get_dag_run(ti)
if dag_run and dag_run.external_trigger:
prev_execution_date = ti.execution_date
else:
prev_execution_date = ti.task.dag.previous_schedule(ti.execution_date)
prev_execution_date_pdl = pendulum.instance(prev_execution_date)
dagtz_prev_execution_date_pdl = prev_execution_date_pdl.in_timezone(ti.task.dag.timezone)
return dagtz_prev_execution_date_pdl
def dagtz_prev_ds(ti):
"""get the TaskInstance prev execution date (in DAG timezone) in YYYY-MM-DD string
"""
dagtz_prev_execution_date_pdl = dagtz_prev_execution_date(ti)
return dagtz_prev_execution_date_pdl.strftime('%Y-%m-%d')
def dagtz_prev_ds_nodash(ti):
"""get the TaskInstance prev execution date (in DAG timezone) in YYYYMMDD string
"""
dagtz_prev_ds_str = dagtz_prev_ds(ti)
return dagtz_prev_ds_str.replace('-', '')
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "custom_macros"
macros = [dagtz_execution_date, ds_add_no_dash,
dagtz_next_execution_date, dagtz_next_ds, dagtz_next_ds_nodash,
dagtz_prev_execution_date, dagtz_prev_ds, dagtz_prev_ds_nodash]
None 其中对我有用,所以这就是我所做的,我使用了 user_defined_macros
但我将所有模板变量传递给我的宏,然后我使用 jinja 呈现结果
MACRO_CONFIG = 'config({"data_interval_start": data_interval_start, "data_interval_end": data_interval_end, "ds": ds, "ds_nodash": ds_nodash, "ts": ts, "ts_nodash_with_tz": ts_nodash_with_tz, "ts_nodash": ts_nodash, "prev_data_interval_start_success": prev_data_interval_start_success, "prev_data_interval_end_success": prev_data_interval_end_success, "dag": dag, "task": task, "macros": macros, "task_instance": task_instance, "ti": ti, "params": params, "conn": conn, "task_instance_key_str": task_instance_key_str, "conf": conf, "run_id": run_id, "dag_run": dag_run, "test_mode": test_mode})'
def config_macro(context):
return FunctionThatReturnsTemplates(context)
with DAG(
'my-dag-id',
schedule_interval=None,
start_date=days_ago(1),
user_defined_macros={'config': config_macro}
) as dag:
...
def config_macro_template(attr_name):
return '{{' + MACRO_CONFIG + '.' + attr_name + '}}'
class FunctionThatReturnsTemplates(object):
def __getattribute__(self, name):
attr = object.__getattribute__(self, name)
logging.info('attr')
logging.info(attr)
logging.info("type(attr)")
logging.info(type(attr))
if callable(attr):
logging.info('method attr')
def render_result(*args, **kwargs):
logging.info('before calling %s' % attr.__name__)
result = attr(*args, **kwargs)
logging.info('done calling %s' % attr.__name__)
return Template(result).render(**self.context) if isinstance(result, str) or isinstance(result, unicode) else result
return render_result
logging.info('attr is not method')
if isinstance(attr, str) or isinstance(attr, unicode):
logging.info('attr is string or unicode')
result = Template(attr).render(**self.context)
logging.info(result)
logging.info("result")
return result
return attr
def __init__(self, context):
logging.info('from sampling pipeline context')
logging.info(context)
self.context = context
...
my_task = SomeOperator(
templated_field=config_macro_template('function(args)'),
task_id='my-task-id'
)
有没有办法在 Airflow 中创建一个用户定义的宏,它本身是从其他宏计算出来的?
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
'simple',
schedule_interval='0 21 * * *',
user_defined_macros={
'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
},
)
task = BashOperator(
task_id='bash_op',
bash_command='echo "{{ next_execution_date }}"',
dag=dag,
)
这里的用例是向后移植新的 Airflow v1.8 next_execution_date
宏以在 Airflow v1.7 中工作。不幸的是,此模板在没有宏扩展的情况下呈现:
$ airflow render simple bash_op 2017-08-09 21:00:00
# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "{{ dag.following_schedule(execution_date) }}"
user_defined_macros
默认不作为模板处理。如果你想在 user_defined_macro
中保留模板(或者如果你在 params
变量中使用模板),你总是可以手动重新 运行 模板函数:
class DoubleTemplatedBashOperator(BashOperator):
def pre_execute(self, context):
context['ti'].render_templates()
这适用于不引用其他参数或 UDM 的模板。这样,您可以拥有 "two-deep" 个模板。
或者将您的 UDM 直接放在 BashOperator
的命令中(最简单的解决方案):
BashOperator(
task_id='bash_op',
bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
dag=dag,
)
以下是一些解决方案:
1。覆盖 BashOperator
以将一些值添加到上下文
class NextExecutionDateAwareBashOperator(BashOperator):
def render_template(self, attr, content, context):
dag = context['dag']
execution_date = context['execution_date']
context['next_execution_date'] = dag.following_schedule(execution_date)
return super().render_templates(attr, content, context)
# or in python 2:
# return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)
这种方法的好处在于:您可以在自定义运算符中捕获一些重复的代码。
不好的部分:在呈现模板化字段之前,您必须编写一个自定义运算符来向上下文添加值。
2。在用户定义的宏中进行计算
Macros 不一定是值。它们可以是函数。
在你的日子里:
def compute_next_execution_date(dag, execution_date):
return dag.following_schedule(execution_date)
dag = DAG(
'simple',
schedule_interval='0 21 * * *',
user_defined_macros={
'next_execution_date': compute_next_execution_date,
},
)
task = BashOperator(
task_id='bash_op',
bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
dag=dag,
)
好的部分:您可以定义可重用函数来处理运行时可用的值(XCom values、作业实例属性、任务实例属性等),并使您的函数结果可用于呈现模板。
不好的部分(但不是那么烦人):你必须在每个需要的 dag 中导入这样的函数作为用户定义的宏。
3。直接在模板中调用语句
此解决方案是最简单的(如
BashOperator(
task_id='bash_op',
bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
dag=dag,
)
非常适合像这样的简单调用。它们是其他一些直接可用的对象,如 macros(如 task
、task_instance
等);甚至还有一些标准模块可用(例如 macros.time
, ...)。
我会投票支持让 Airflow 插件注入您预定义的宏。 使用此方法,您可以在任何 Operator 中使用预定义的宏而无需声明任何内容。
下面是我们正在使用的一些自定义宏。
使用示例:{{ macros.dagtz_next_execution_date(ti) }}
from airflow.plugins_manager import AirflowPlugin
from datetime import datetime, timedelta
from airflow.utils.db import provide_session
from airflow.models import DagRun
import pendulum
@provide_session
def _get_dag_run(ti, session=None):
"""Get DagRun obj of the TaskInstance ti
Args:
ti (TYPE): the TaskInstance object
session (None, optional): Not in use
Returns:
DagRun obj: the DagRun obj of the TaskInstance ti
"""
task = ti.task
dag_run = None
if hasattr(task, 'dag'):
dag_run = (
session.query(DagRun)
.filter_by(
dag_id=task.dag.dag_id,
execution_date=ti.execution_date)
.first()
)
session.expunge_all()
session.commit()
return dag_run
def ds_add_no_dash(ds, days):
"""
Add or subtract days from a YYYYMMDD
:param ds: anchor date in ``YYYYMMDD`` format to add to
:type ds: str
:param days: number of days to add to the ds, you can use negative values
:type days: int
>>> ds_add('20150101', 5)
'20150106'
>>> ds_add('20150106', -5)
'20150101'
"""
ds = datetime.strptime(ds, '%Y%m%d')
if days:
ds = ds + timedelta(days)
return ds.isoformat()[:10].replace('-', '')
def dagtz_execution_date(ti):
"""get the TaskInstance execution date (in DAG timezone) in pendulum obj
Args:
ti (TaskInstance): the TaskInstance object
Returns:
pendulum obj: execution_date in pendulum object (in DAG tz)
"""
execution_date_pdl = pendulum.instance(ti.execution_date)
dagtz_execution_date_pdl = execution_date_pdl.in_timezone(ti.task.dag.timezone)
return dagtz_execution_date_pdl
def dagtz_next_execution_date(ti):
"""get the TaskInstance next execution date (in DAG timezone) in pendulum obj
Args:
ti (TaskInstance): the TaskInstance object
Returns:
pendulum obj: next execution_date in pendulum object (in DAG tz)
"""
# For manually triggered dagruns that aren't run on a schedule, next/previous
# schedule dates don't make sense, and should be set to execution date for
# consistency with how execution_date is set for manually triggered tasks, i.e.
# triggered_date == execution_date.
dag_run = _get_dag_run(ti)
if dag_run and dag_run.external_trigger:
next_execution_date = ti.execution_date
else:
next_execution_date = ti.task.dag.following_schedule(ti.execution_date)
next_execution_date_pdl = pendulum.instance(next_execution_date)
dagtz_next_execution_date_pdl = next_execution_date_pdl.in_timezone(ti.task.dag.timezone)
return dagtz_next_execution_date_pdl
def dagtz_next_ds(ti):
"""get the TaskInstance next execution date (in DAG timezone) in YYYY-MM-DD string
"""
dagtz_next_execution_date_pdl = dagtz_next_execution_date(ti)
return dagtz_next_execution_date_pdl.strftime('%Y-%m-%d')
def dagtz_next_ds_nodash(ti):
"""get the TaskInstance next execution date (in DAG timezone) in YYYYMMDD string
"""
dagtz_next_ds_str = dagtz_next_ds(ti)
return dagtz_next_ds_str.replace('-', '')
def dagtz_prev_execution_date(ti):
"""get the TaskInstance previous execution date (in DAG timezone) in pendulum obj
Args:
ti (TaskInstance): the TaskInstance object
Returns:
pendulum obj: previous execution_date in pendulum object (in DAG tz)
"""
# For manually triggered dagruns that aren't run on a schedule, next/previous
# schedule dates don't make sense, and should be set to execution date for
# consistency with how execution_date is set for manually triggered tasks, i.e.
# triggered_date == execution_date.
dag_run = _get_dag_run(ti)
if dag_run and dag_run.external_trigger:
prev_execution_date = ti.execution_date
else:
prev_execution_date = ti.task.dag.previous_schedule(ti.execution_date)
prev_execution_date_pdl = pendulum.instance(prev_execution_date)
dagtz_prev_execution_date_pdl = prev_execution_date_pdl.in_timezone(ti.task.dag.timezone)
return dagtz_prev_execution_date_pdl
def dagtz_prev_ds(ti):
"""get the TaskInstance prev execution date (in DAG timezone) in YYYY-MM-DD string
"""
dagtz_prev_execution_date_pdl = dagtz_prev_execution_date(ti)
return dagtz_prev_execution_date_pdl.strftime('%Y-%m-%d')
def dagtz_prev_ds_nodash(ti):
"""get the TaskInstance prev execution date (in DAG timezone) in YYYYMMDD string
"""
dagtz_prev_ds_str = dagtz_prev_ds(ti)
return dagtz_prev_ds_str.replace('-', '')
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "custom_macros"
macros = [dagtz_execution_date, ds_add_no_dash,
dagtz_next_execution_date, dagtz_next_ds, dagtz_next_ds_nodash,
dagtz_prev_execution_date, dagtz_prev_ds, dagtz_prev_ds_nodash]
None 其中对我有用,所以这就是我所做的,我使用了 user_defined_macros
但我将所有模板变量传递给我的宏,然后我使用 jinja 呈现结果
MACRO_CONFIG = 'config({"data_interval_start": data_interval_start, "data_interval_end": data_interval_end, "ds": ds, "ds_nodash": ds_nodash, "ts": ts, "ts_nodash_with_tz": ts_nodash_with_tz, "ts_nodash": ts_nodash, "prev_data_interval_start_success": prev_data_interval_start_success, "prev_data_interval_end_success": prev_data_interval_end_success, "dag": dag, "task": task, "macros": macros, "task_instance": task_instance, "ti": ti, "params": params, "conn": conn, "task_instance_key_str": task_instance_key_str, "conf": conf, "run_id": run_id, "dag_run": dag_run, "test_mode": test_mode})'
def config_macro(context):
return FunctionThatReturnsTemplates(context)
with DAG(
'my-dag-id',
schedule_interval=None,
start_date=days_ago(1),
user_defined_macros={'config': config_macro}
) as dag:
...
def config_macro_template(attr_name):
return '{{' + MACRO_CONFIG + '.' + attr_name + '}}'
class FunctionThatReturnsTemplates(object):
def __getattribute__(self, name):
attr = object.__getattribute__(self, name)
logging.info('attr')
logging.info(attr)
logging.info("type(attr)")
logging.info(type(attr))
if callable(attr):
logging.info('method attr')
def render_result(*args, **kwargs):
logging.info('before calling %s' % attr.__name__)
result = attr(*args, **kwargs)
logging.info('done calling %s' % attr.__name__)
return Template(result).render(**self.context) if isinstance(result, str) or isinstance(result, unicode) else result
return render_result
logging.info('attr is not method')
if isinstance(attr, str) or isinstance(attr, unicode):
logging.info('attr is string or unicode')
result = Template(attr).render(**self.context)
logging.info(result)
logging.info("result")
return result
return attr
def __init__(self, context):
logging.info('from sampling pipeline context')
logging.info(context)
self.context = context
...
my_task = SomeOperator(
templated_field=config_macro_template('function(args)'),
task_id='my-task-id'
)