如何访问函数中定义的变量 [airflow Pythonoperator called function] 并在 airflow-template 范围之外使用它?
How can I access a variable defined in a function[airflow Pythonoperator called function] and use it outside the airflow-template scope?
## Section 1 | Import Modules
## Section 2 | DAG Default Arguments
## Section 3 | Instantiate the DAG
## Section 4 | defining Utils
## Section 5 | Task defining
## Section 6 | Defining dependecies
## Section 1 | Import Modules
from airflow import DAG
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
## Section 2 | DAG Default Arguments
default_args = {
'owner': 'Sourav',
'depends_on_past': False,
'start_date': datetime(2021, 6, 11),
'retries': 0,
}
## Section 3 | Instantiate the DAG
dag = DAG('basic_skeleton',
description='basic skeleton of a DAG',
default_args=default_args,
schedule_interval=None,
catchup=False,
tags=['skeleton'],
)
x = 0
## Section 4 | defining Utils
def print_context(**kwargs):
print("hello world")
return "hello world!!!"
def sum(**kwargs):
c = 1+2
return c
def diff(**kwargs):
global c
c = 2-1
return c
## Doubts
x = c
y = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf
## Section 5 | Task defining
with dag:
t_printHello_prejob = PythonOperator(
task_id='t_printHello_prejob',
provide_context=True,
python_callable=print_context,
dag=dag,
)
t_sum_job = PythonOperator(
task_id='t_sum_job',
python_callable=sum,
provide_context=True,
dag=dag
)
## Section 6 | Defining dependecies
t_printHello_prejob>>t_sum_job
现在,我需要知道两件事:
x = c,我正在尝试使用这个变量x来定义一个for循环,用于下一个任务需要拍摄的次数。不知何故,Airflow UI 是从基本编译的 .py 文件呈现的,并且 x 加载的值为 0 而不是 1,即使我在函数中执行 global c
也是如此。有时,airflow UI 偶然显示值为1,我想知道背后的逻辑。如何控制全局变量?
对于每个 dagrun,我想将 conf
从气流模板范围中取出并在全局 python 区域 [非气流模板] 中使用它。我知道,我可以在气流模板中使用 jinja 宏。但是,我需要在气流范围之外访问 conf。
y = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf
该语句为我提供了最新的 dag_run conf。
但是,对我来说,我同时有多个 DAG_runs 运行,所以我可以在这个变量中为那个 dagrun 获取当前的 dag_run conf 吗?
Sourav,告诉我这是否有帮助:
在 Airflow DAG 中,我们通常不会在任务之间共享数据,即使这在技术上是可行的。我们鼓励我们保持每个任务的幂等性,就像函数式编程中的“纯函数”一样。这意味着给定输入 x
,给定任务将始终创建相同的结果。
您在此处定义的 DAG 基本上是数据管道的蓝图。当 DAG 和任务被 Airflow 调度程序评估时,任务将调用的函数是……好吧,还没有调用。直觉上,因此我希望 x
总是等于零,虽然解开为什么它不总是零是一个有趣的谜,但在 DAG 运行 期间改变全局变量并不是 Airflow 的设置去做。
也就是说,可靠地改变 x
或 c
并在任务中使用它的一种简单方法是将其存储在 Airflow 变量中:
from airflow.models.variable import Variable
...
Variable.set('x', 0)
...
def sum(**kwargs):
c = 1+2
return c
def diff(**kwargs):
c = 2-1
Variable.set('c', c)
return c
def a_func_that_uses_c(**kwargs):
"""make sure this function is called in a task _after_ the task calling `diff`"""
c = Variable.get('c')
...
一个问题是 Airflow 变量是字符串,所以如果您要存储一个整数,就像这里一样,您需要 eval(c)
或 int(c)
来获取它。
## Section 1 | Import Modules
## Section 2 | DAG Default Arguments
## Section 3 | Instantiate the DAG
## Section 4 | defining Utils
## Section 5 | Task defining
## Section 6 | Defining dependecies
## Section 1 | Import Modules
from airflow import DAG
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
## Section 2 | DAG Default Arguments
default_args = {
'owner': 'Sourav',
'depends_on_past': False,
'start_date': datetime(2021, 6, 11),
'retries': 0,
}
## Section 3 | Instantiate the DAG
dag = DAG('basic_skeleton',
description='basic skeleton of a DAG',
default_args=default_args,
schedule_interval=None,
catchup=False,
tags=['skeleton'],
)
x = 0
## Section 4 | defining Utils
def print_context(**kwargs):
print("hello world")
return "hello world!!!"
def sum(**kwargs):
c = 1+2
return c
def diff(**kwargs):
global c
c = 2-1
return c
## Doubts
x = c
y = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf
## Section 5 | Task defining
with dag:
t_printHello_prejob = PythonOperator(
task_id='t_printHello_prejob',
provide_context=True,
python_callable=print_context,
dag=dag,
)
t_sum_job = PythonOperator(
task_id='t_sum_job',
python_callable=sum,
provide_context=True,
dag=dag
)
## Section 6 | Defining dependecies
t_printHello_prejob>>t_sum_job
现在,我需要知道两件事:
x = c,我正在尝试使用这个变量x来定义一个for循环,用于下一个任务需要拍摄的次数。不知何故,Airflow UI 是从基本编译的 .py 文件呈现的,并且 x 加载的值为 0 而不是 1,即使我在函数中执行
global c
也是如此。有时,airflow UI 偶然显示值为1,我想知道背后的逻辑。如何控制全局变量?对于每个 dagrun,我想将
conf
从气流模板范围中取出并在全局 python 区域 [非气流模板] 中使用它。我知道,我可以在气流模板中使用 jinja 宏。但是,我需要在气流范围之外访问 conf。y = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf
该语句为我提供了最新的 dag_run conf。 但是,对我来说,我同时有多个 DAG_runs 运行,所以我可以在这个变量中为那个 dagrun 获取当前的 dag_run conf 吗?
Sourav,告诉我这是否有帮助:
在 Airflow DAG 中,我们通常不会在任务之间共享数据,即使这在技术上是可行的。我们鼓励我们保持每个任务的幂等性,就像函数式编程中的“纯函数”一样。这意味着给定输入 x
,给定任务将始终创建相同的结果。
您在此处定义的 DAG 基本上是数据管道的蓝图。当 DAG 和任务被 Airflow 调度程序评估时,任务将调用的函数是……好吧,还没有调用。直觉上,因此我希望 x
总是等于零,虽然解开为什么它不总是零是一个有趣的谜,但在 DAG 运行 期间改变全局变量并不是 Airflow 的设置去做。
也就是说,可靠地改变 x
或 c
并在任务中使用它的一种简单方法是将其存储在 Airflow 变量中:
from airflow.models.variable import Variable
...
Variable.set('x', 0)
...
def sum(**kwargs):
c = 1+2
return c
def diff(**kwargs):
c = 2-1
Variable.set('c', c)
return c
def a_func_that_uses_c(**kwargs):
"""make sure this function is called in a task _after_ the task calling `diff`"""
c = Variable.get('c')
...
一个问题是 Airflow 变量是字符串,所以如果您要存储一个整数,就像这里一样,您需要 eval(c)
或 int(c)
来获取它。