airflow TriggerDagRunOperator 如何更改执行日期
airflow TriggerDagRunOperator how to change the execution date
我注意到对于计划任务,执行日期根据
设置为过去
Airflow was developed as a solution for ETL needs. In the ETL world,
you typically summarize data. So, if I want to summarize data for
2016-02-19, I would do it at 2016-02-20 midnight GMT, which would be
right after all data for 2016-02-19 becomes available.
但是,当一个 dag 触发另一个 dag 时,执行时间设置为 now()。
有没有办法让触发dag的执行时间与触发dag的执行时间相同?当然,我可以重写模板并使用yesterday_ds,但是,这是一个棘手的解决方案。
以下 class 扩展了 TriggerDagRunOperator
以允许将执行日期作为字符串传递,然后将其转换回日期时间。这有点老套,但这是我找到的完成工作的唯一方法。
from datetime import datetime
import logging
from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder
class MMTTriggerDagRunOperator(TriggerDagRunOperator):
"""
MMT-patched for passing explicit execution date
(otherwise it's hard to hook the datetime.now() date).
Use when you want to explicity set the execution date on the target DAG
from the controller DAG.
Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e
Parameters
------------------
execution_date: str
the custom execution date (jinja'd)
Usage Example:
-------------------
my_dag_trigger_operator = MMTTriggerDagRunOperator(
execution_date="{{execution_date}}"
task_id='my_dag_trigger_operator',
trigger_dag_id='my_target_dag_id',
python_callable=lambda: random.getrandbits(1),
params={},
dag=my_controller_dag
)
"""
template_fields = ('execution_date',)
def __init__(
self, trigger_dag_id, python_callable, execution_date,
*args, **kwargs
):
self.execution_date = execution_date
super(MMTTriggerDagRunOperator, self).__init__(
trigger_dag_id=trigger_dag_id, python_callable=python_callable,
*args, **kwargs
)
def execute(self, context):
run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
execution_date=self.execution_date,
conf=dro.payload,
external_trigger=True)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
session.close()
else:
logging.info("Criteria not met, moving on")
在使用这个而不设置 execution_date=now()
时,您可能 运行 遇到一个问题:如果您尝试使用相同的 dag,您的操作员将抛出 mysql 错误execution_date
两次。这是因为 execution_date
和 dag_id
用于创建行索引,无法插入具有相同索引的行。
我想不出你想要 运行 两个具有相同 execution_date
的相同 dag 的原因,但这是我 运行 喜欢的东西测试,你不应该被它吓到。只需清除旧作业或使用不同的日期时间即可。
我对 MMTTriggerDagRunOperator 做了一些改进。该函数检查 dag_run 是否已经存在,如果找到,使用 airflow 的清除函数重新启动 dag。这允许我们在 dag 之间创建依赖关系,因为将执行日期移动到触发的 dag 的可能性打开了一个充满惊人可能性的世界。我想知道为什么这不是气流中的默认行为。
def execute(self, context):
run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
if not trigger_dag.get_dagrun( self.execution_date ):
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
execution_date=self.execution_date,
conf=dro.payload,
external_trigger=True
)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
else:
trigger_dag.clear(
start_date = self.execution_date,
end_date = self.execution_date,
only_failed = False,
only_running = False,
confirm_prompt = False,
reset_dag_runs = True,
include_subdags= False,
dry_run = False
)
logging.info("Cleared DagRun {}".format(trigger_dag))
session.close()
else:
logging.info("Criteria not met, moving on")
airflow 的实验性 API 部分提供了一个功能,可让您触发具有特定执行日期的 dag。
https://github.com/apache/incubator-airflow/blob/master/airflow/api/common/experimental/trigger_dag.py
您可以将此函数作为 PythonOperator 的一部分调用并实现 objective。
所以它看起来像
from airflow.api.common.experimental.trigger_dag import trigger_dag
trigger_operator=PythonOperator(task_id='YOUR_TASK_ID',
python_callable=trigger_dag,
op_args=['dag_id'],
op_kwargs={'execution_date': datetime.now()})
TriggerDagRunOperator
现在有一个 execution_date
参数来设置触发 运行 的执行日期。
不幸的是,该参数不在模板字段中。
如果它将被添加到模板字段中(或者如果您覆盖运算符并更改 template_fields 值),则可以像这样使用它:
my_trigger_task= TriggerDagRunOperator(task_id='my_trigger_task',
trigger_dag_id="triggered_dag_id",
python_callable=conditionally_trigger,
execution_date= '{{execution_date}}',
dag=dag)
它尚未发布,但您可以在此处查看源代码:
https://github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py
我注意到对于计划任务,执行日期根据
设置为过去Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if I want to summarize data for 2016-02-19, I would do it at 2016-02-20 midnight GMT, which would be right after all data for 2016-02-19 becomes available.
但是,当一个 dag 触发另一个 dag 时,执行时间设置为 now()。
有没有办法让触发dag的执行时间与触发dag的执行时间相同?当然,我可以重写模板并使用yesterday_ds,但是,这是一个棘手的解决方案。
以下 class 扩展了 TriggerDagRunOperator
以允许将执行日期作为字符串传递,然后将其转换回日期时间。这有点老套,但这是我找到的完成工作的唯一方法。
from datetime import datetime
import logging
from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder
class MMTTriggerDagRunOperator(TriggerDagRunOperator):
"""
MMT-patched for passing explicit execution date
(otherwise it's hard to hook the datetime.now() date).
Use when you want to explicity set the execution date on the target DAG
from the controller DAG.
Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e
Parameters
------------------
execution_date: str
the custom execution date (jinja'd)
Usage Example:
-------------------
my_dag_trigger_operator = MMTTriggerDagRunOperator(
execution_date="{{execution_date}}"
task_id='my_dag_trigger_operator',
trigger_dag_id='my_target_dag_id',
python_callable=lambda: random.getrandbits(1),
params={},
dag=my_controller_dag
)
"""
template_fields = ('execution_date',)
def __init__(
self, trigger_dag_id, python_callable, execution_date,
*args, **kwargs
):
self.execution_date = execution_date
super(MMTTriggerDagRunOperator, self).__init__(
trigger_dag_id=trigger_dag_id, python_callable=python_callable,
*args, **kwargs
)
def execute(self, context):
run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
execution_date=self.execution_date,
conf=dro.payload,
external_trigger=True)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
session.close()
else:
logging.info("Criteria not met, moving on")
在使用这个而不设置 execution_date=now()
时,您可能 运行 遇到一个问题:如果您尝试使用相同的 dag,您的操作员将抛出 mysql 错误execution_date
两次。这是因为 execution_date
和 dag_id
用于创建行索引,无法插入具有相同索引的行。
我想不出你想要 运行 两个具有相同 execution_date
的相同 dag 的原因,但这是我 运行 喜欢的东西测试,你不应该被它吓到。只需清除旧作业或使用不同的日期时间即可。
我对 MMTTriggerDagRunOperator 做了一些改进。该函数检查 dag_run 是否已经存在,如果找到,使用 airflow 的清除函数重新启动 dag。这允许我们在 dag 之间创建依赖关系,因为将执行日期移动到触发的 dag 的可能性打开了一个充满惊人可能性的世界。我想知道为什么这不是气流中的默认行为。
def execute(self, context):
run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
if not trigger_dag.get_dagrun( self.execution_date ):
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
execution_date=self.execution_date,
conf=dro.payload,
external_trigger=True
)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
else:
trigger_dag.clear(
start_date = self.execution_date,
end_date = self.execution_date,
only_failed = False,
only_running = False,
confirm_prompt = False,
reset_dag_runs = True,
include_subdags= False,
dry_run = False
)
logging.info("Cleared DagRun {}".format(trigger_dag))
session.close()
else:
logging.info("Criteria not met, moving on")
airflow 的实验性 API 部分提供了一个功能,可让您触发具有特定执行日期的 dag。
https://github.com/apache/incubator-airflow/blob/master/airflow/api/common/experimental/trigger_dag.py
您可以将此函数作为 PythonOperator 的一部分调用并实现 objective。
所以它看起来像
from airflow.api.common.experimental.trigger_dag import trigger_dag
trigger_operator=PythonOperator(task_id='YOUR_TASK_ID',
python_callable=trigger_dag,
op_args=['dag_id'],
op_kwargs={'execution_date': datetime.now()})
TriggerDagRunOperator
现在有一个 execution_date
参数来设置触发 运行 的执行日期。
不幸的是,该参数不在模板字段中。
如果它将被添加到模板字段中(或者如果您覆盖运算符并更改 template_fields 值),则可以像这样使用它:
my_trigger_task= TriggerDagRunOperator(task_id='my_trigger_task',
trigger_dag_id="triggered_dag_id",
python_callable=conditionally_trigger,
execution_date= '{{execution_date}}',
dag=dag)
它尚未发布,但您可以在此处查看源代码: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py