execution_date in airflow: 需要作为变量访问
execution_date in airflow: need to access as a variable
我真的是这个论坛的新手。但我一直在为我们的公司玩弄气流。对不起,如果这个问题听起来很愚蠢。
我正在使用一堆 BashOperators 编写管道。
基本上,对于每个任务,我只想使用 'curl'
调用 REST api
这是我的管道的样子(非常简化的版本):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['xxxx@xxx.xxx'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
如果你注意到我在做 current_datetime= datetime_obj.now(tz=tz.tzlocal())
相反,我想要的是 'execution_date'
如何直接使用 'execution_date' 并将其分配给 python 文件中的变量?
我遇到了访问参数的一般问题。
任何帮助将不胜感激。
谢谢
PythonOperator 构造函数采用 'provide_context' 参数(参见 https://pythonhosted.org/airflow/code.html)。如果它是 True,那么它将通过 kwargs 将一些参数传递给 python_callable。 kwargs['execution_date'] 是你想要的,我相信。
像这样:
def python_method(ds, **kwargs):
Variable.set('execution_date', kwargs['execution_date'])
return
doit = PythonOperator(
task_id='doit',
provide_context=True,
python_callable=python_method,
dag=dag)
我不确定如何使用 BashOperator 执行此操作,但您可能会从这个问题开始:https://github.com/airbnb/airflow/issues/775
BashOperator
的 bash_command
参数 是一个 模板 。您可以使用 execution_date
变量在任何模板中将 execution_date
作为 datetime
object 访问。在模板中,您可以使用任何jinja2
方法对其进行操作。
使用以下内容作为您的 BashOperator
bash_command
string:
# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}
# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
如果你只想要执行日期的字符串等价物,ds
将return一个日期戳(YYYY-MM-DD),ds_nodash
returns相同没有破折号 (YYYYMMDD) 等。有关 macros
的更多信息,请参见 Api Docs.
您的最终运算符将如下所示:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
我认为您不能为任务实例之外的气流上下文中的变量分配值,它们仅在 运行 时可用。在气流中加载和执行 dag 时,基本上有 2 个不同的步骤:
首先解释和解析您的 dag 文件。它必须工作和编译并且任务定义必须正确(没有语法错误或任何东西)。在此步骤中,如果您调用函数来填充某些值,这些函数将无法访问气流上下文(例如,执行日期,如果您正在做一些回填,则更是如此)。
第二步是dag的执行。只有在第二步中,airflow (execution_date, ds, etc...
) 提供的变量才可用,因为它们与 dag 的执行有关。
因此您无法使用 Airflow 上下文初始化全局变量,但是,Airflow 为您提供了多种机制来实现相同的效果:
在您的命令中使用 jinja 模板(它可以在代码中的字符串中或在文件中,两者都会被处理)。您在这里有可用模板的列表:https://airflow.apache.org/macros.html#default-variables。请注意,一些函数也可用,特别是计算天数增量和日期格式。
使用 PythonOperator 在其中传递上下文(使用 provide_context
参数)。这将允许您使用语法 kwargs['<variable_name']
访问相同的模板。如果需要,您可以 return 来自 PythonOperator 的值,该值将存储在 XCOM 变量中,您稍后可以在任何模板中使用。使用此语法访问 XCOM 变量:https://airflow.apache.org/concepts.html#xcoms
如果您编写自己的运算符,则可以使用字典 context
.
访问气流变量
def execute(self, context):
execution_date = context.get("execution_date")
这应该在 Operator
的 execute() 方法中
要在 PythonOperator
的可调用函数中打印执行日期,您可以在 Airflow 脚本中使用以下内容,也可以添加 start_time
和 end_time
,如下所示:
def python_func(**kwargs):
ts = kwargs["execution_date"]
end_time = str(ts)
start_time = str(ts.add(minutes=-30))
我已将日期时间值转换为字符串,因为我需要在 SQL 查询中传递它。我们也可以使用它。
你可以考虑SimpleHttpOperatorhttps://airflow.apache.org/_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator。发起http请求就这么简单。您可以通过模板将 execution_date 与端点参数一起传递。
这是另一种没有上下文的方式。使用 dag 的上次执行时间对于计划的 ETL 作业非常有帮助。比如那个'downloads all newly added files'的dag。不要硬编码 datetime.datetime,而是使用 dag 的最后执行日期作为时间过滤器。
Airflow Dags 实际上有一个名为 DagRun 的 class,可以像这样访问它:dag_runs = DagRun.find(dag_id=dag_id)
这是获取最近 运行 的执行时间的简单方法:
def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[1] if len(dag_runs) > 1 else None
然后,在您的 pythonOperator 中,您可以通过调用上面创建的函数动态访问 dag 的最后一次执行:
last_execution = get_most_recent_dag_run('svb_to_s3')
现在它是一个变量!
我真的是这个论坛的新手。但我一直在为我们的公司玩弄气流。对不起,如果这个问题听起来很愚蠢。
我正在使用一堆 BashOperators 编写管道。 基本上,对于每个任务,我只想使用 'curl'
调用 REST api这是我的管道的样子(非常简化的版本):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['xxxx@xxx.xxx'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
如果你注意到我在做 current_datetime= datetime_obj.now(tz=tz.tzlocal())
相反,我想要的是 'execution_date'
如何直接使用 'execution_date' 并将其分配给 python 文件中的变量?
我遇到了访问参数的一般问题。 任何帮助将不胜感激。
谢谢
PythonOperator 构造函数采用 'provide_context' 参数(参见 https://pythonhosted.org/airflow/code.html)。如果它是 True,那么它将通过 kwargs 将一些参数传递给 python_callable。 kwargs['execution_date'] 是你想要的,我相信。
像这样:
def python_method(ds, **kwargs):
Variable.set('execution_date', kwargs['execution_date'])
return
doit = PythonOperator(
task_id='doit',
provide_context=True,
python_callable=python_method,
dag=dag)
我不确定如何使用 BashOperator 执行此操作,但您可能会从这个问题开始:https://github.com/airbnb/airflow/issues/775
BashOperator
的 bash_command
参数 是一个 模板 。您可以使用 execution_date
变量在任何模板中将 execution_date
作为 datetime
object 访问。在模板中,您可以使用任何jinja2
方法对其进行操作。
使用以下内容作为您的 BashOperator
bash_command
string:
# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}
# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
如果你只想要执行日期的字符串等价物,ds
将return一个日期戳(YYYY-MM-DD),ds_nodash
returns相同没有破折号 (YYYYMMDD) 等。有关 macros
的更多信息,请参见 Api Docs.
您的最终运算符将如下所示:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
我认为您不能为任务实例之外的气流上下文中的变量分配值,它们仅在 运行 时可用。在气流中加载和执行 dag 时,基本上有 2 个不同的步骤:
首先解释和解析您的 dag 文件。它必须工作和编译并且任务定义必须正确(没有语法错误或任何东西)。在此步骤中,如果您调用函数来填充某些值,这些函数将无法访问气流上下文(例如,执行日期,如果您正在做一些回填,则更是如此)。
第二步是dag的执行。只有在第二步中,airflow (
execution_date, ds, etc...
) 提供的变量才可用,因为它们与 dag 的执行有关。
因此您无法使用 Airflow 上下文初始化全局变量,但是,Airflow 为您提供了多种机制来实现相同的效果:
在您的命令中使用 jinja 模板(它可以在代码中的字符串中或在文件中,两者都会被处理)。您在这里有可用模板的列表:https://airflow.apache.org/macros.html#default-variables。请注意,一些函数也可用,特别是计算天数增量和日期格式。
使用 PythonOperator 在其中传递上下文(使用
provide_context
参数)。这将允许您使用语法kwargs['<variable_name']
访问相同的模板。如果需要,您可以 return 来自 PythonOperator 的值,该值将存储在 XCOM 变量中,您稍后可以在任何模板中使用。使用此语法访问 XCOM 变量:https://airflow.apache.org/concepts.html#xcoms如果您编写自己的运算符,则可以使用字典
context
. 访问气流变量
def execute(self, context):
execution_date = context.get("execution_date")
这应该在 Operator
的 execute() 方法中要在 PythonOperator
的可调用函数中打印执行日期,您可以在 Airflow 脚本中使用以下内容,也可以添加 start_time
和 end_time
,如下所示:
def python_func(**kwargs):
ts = kwargs["execution_date"]
end_time = str(ts)
start_time = str(ts.add(minutes=-30))
我已将日期时间值转换为字符串,因为我需要在 SQL 查询中传递它。我们也可以使用它。
你可以考虑SimpleHttpOperatorhttps://airflow.apache.org/_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator。发起http请求就这么简单。您可以通过模板将 execution_date 与端点参数一起传递。
这是另一种没有上下文的方式。使用 dag 的上次执行时间对于计划的 ETL 作业非常有帮助。比如那个'downloads all newly added files'的dag。不要硬编码 datetime.datetime,而是使用 dag 的最后执行日期作为时间过滤器。
Airflow Dags 实际上有一个名为 DagRun 的 class,可以像这样访问它:dag_runs = DagRun.find(dag_id=dag_id)
这是获取最近 运行 的执行时间的简单方法:
def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[1] if len(dag_runs) > 1 else None
然后,在您的 pythonOperator 中,您可以通过调用上面创建的函数动态访问 dag 的最后一次执行:
last_execution = get_most_recent_dag_run('svb_to_s3')
现在它是一个变量!