在 dag 中递增或递减 ts_nodash 秒

increment or decrement ts_nodash seconds in dag

有没有办法在气流 dag 中增加或减少 {{ts_nodash}} 中的秒数? 由于 {{ts_nodash}} 获得的相同日期时间,我的输出被覆盖了。有没有办法增加 {{ts_nodash}} 1-2 秒 就像我们可以在 {{ ds }} -

中做几天一样
airflow.macros.ds_add(ds, days) 

ds_add('2015-01-01', 5)

尝试使用钟摆日期宏而不是日期字符串。您可以操纵日期并将其转回字符串

import pendulum
....
execution_date.subtract(seconds=2).to_datetime_string() 

有两种方法可以解决这个问题。

  1. ds_add and put it in your plugin 一样创建自己的宏。
  2. 在 Jinja 模板中使用 python 代码。

选项 2 最简单直接。

from datetime import datetime

from airflow.models import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="trigger_dag",
    start_date=datetime(2021, 3, 10),
    catchup=True,
    schedule_interval='@once',
)

with dag:
    op = PythonOperator(
        task_id='a',
        python_callable=lambda x, y: print(x, y),
        op_args=[
            '{{ ts_nodash }}',
            '{{ execution_date.subtract(seconds=2).strftime("%Y%m%dT%H%M%S")  }}',
        ],
    )

ts_nodash的获取方式如how the context is built所示。我们实际上是在将格式化为字符串的日期时间对象上构建。

这是任务的呈现模板视图。