execution_date in airflow: 需要作为变量访问

execution_date in airflow: need to access as a variable

我真的是这个论坛的新手。但我一直在为我们的公司玩弄气流。对不起,如果这个问题听起来很愚蠢。

我正在使用一堆 BashOperators 编写管道。 基本上,对于每个任务,我只想使用 'curl'

调用 REST api

这是我的管道的样子(非常简化的版本):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

如果你注意到我在做 current_datetime= datetime_obj.now(tz=tz.tzlocal()) 相反,我想要的是 'execution_date'

如何直接使用 'execution_date' 并将其分配给 python 文件中的变量?

我遇到了访问参数的一般问题。 任何帮助将不胜感激。

谢谢

PythonOperator 构造函数采用 'provide_context' 参数(参见 https://pythonhosted.org/airflow/code.html)。如果它是 True,那么它将通过 kwargs 将一些参数传递给 python_callable。 kwargs['execution_date'] 是你想要的,我相信。

像这样:

def python_method(ds, **kwargs):
    Variable.set('execution_date', kwargs['execution_date'])
    return

doit = PythonOperator(
    task_id='doit',
    provide_context=True,
    python_callable=python_method,
    dag=dag)

我不确定如何使用 BashOperator 执行此操作,但您可能会从这个问题开始:https://github.com/airbnb/airflow/issues/775

BashOperatorbash_command 参数 是一个 模板 。您可以使用 execution_date 变量在任何模板中将 execution_date 作为 datetime object 访问。在模板中,您可以使用任何jinja2方法对其进行操作。

使用以下内容作为您的 BashOperator bash_command string:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

如果你只想要执行日期的字符串等价物,ds将return一个日期戳(YYYY-MM-DD),ds_nodash returns相同没有破折号 (YYYYMMDD) 等。有关 macros 的更多信息,请参见 Api Docs.


您的最终运算符将如下所示:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)

我认为您不能为任务实例之外的气流上下文中的变量分配值,它们仅在 运行 时可用。在气流中加载和执行 dag 时,基本上有 2 个不同的步骤:

  • 首先解释和解析您的 dag 文件。它必须工作和编译并且任务定义必须正确(没有语法错误或任何东西)。在此步骤中,如果您调用函数来填充某些值,这些函数将无法访问气流上下文(例如,执行日期,如果您正在做一些回填,则更是如此)。

  • 第二步是dag的执行。只有在第二步中,airflow (execution_date, ds, etc...) 提供的变量才可用,因为它们与 dag 的执行有关。

因此您无法使用 Airflow 上下文初始化全局变量,但是,Airflow 为您提供了多种机制来实现相同的效果:

  1. 在您的命令中使用 jinja 模板(它可以在代码中的字符串中或在文件中,两者都会被处理)。您在这里有可用模板的列表:https://airflow.apache.org/macros.html#default-variables。请注意,一些函数也可用,特别是计算天数增量和日期格式。

  2. 使用 PythonOperator 在其中传递上下文(使用 provide_context 参数)。这将允许您使用语法 kwargs['<variable_name'] 访问相同的模板。如果需要,您可以 return 来自 PythonOperator 的值,该值将存储在 XCOM 变量中,您稍后可以在任何模板中使用。使用此语法访问 XCOM 变量:https://airflow.apache.org/concepts.html#xcoms

  3. 如果您编写自己的运算符,则可以使用字典 context.

  4. 访问气流变量
def execute(self, context):
    execution_date = context.get("execution_date")

这应该在 Operator

的 execute() 方法中

要在 PythonOperator 的可调用函数中打印执行日期,您可以在 Airflow 脚本中使用以下内容,也可以添加 start_timeend_time,如下所示:

def python_func(**kwargs):
    ts = kwargs["execution_date"]
    end_time = str(ts)
    start_time = str(ts.add(minutes=-30))

我已将日期时间值转换为字符串,因为我需要在 SQL 查询中传递它。我们也可以使用它。

你可以考虑SimpleHttpOperatorhttps://airflow.apache.org/_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator。发起http请求就这么简单。您可以通过模板将 execution_date 与端点参数一起传递。

这是另一种没有上下文的方式。使用 dag 的上次执行时间对于计划的 ETL 作业非常有帮助。比如那个'downloads all newly added files'的dag。不要硬编码 datetime.datetime,而是使用 dag 的最后执行日期作为时间过滤器。

Airflow Dags 实际上有一个名为 DagRun 的 class,可以像这样访问它:dag_runs = DagRun.find(dag_id=dag_id)

这是获取最近 运行 的执行时间的简单方法:

def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[1] if len(dag_runs) > 1 else None

然后,在您的 pythonOperator 中,您可以通过调用上面创建的函数动态访问 dag 的最后一次执行:

last_execution = get_most_recent_dag_run('svb_to_s3')

现在它是一个变量!