如何在其他运算符中使用 Python 运算符中声明的变量?
How to use variables declared in Python Operator in other operators?
我需要在 python 运算符中计算一个值并在其他运算符中使用它,如下所示。但是我收到“dag_var 不存在”的火花提交和电子邮件运算符/
我将 dag_var 声明为 python 可调用对象中的全局变量。但是我无法在其他运营商中访问它。
def get_dag_var(ds, **kwargs):
global dag_var
dag_var = kwargs['dag_run'].run_id
with DAG(
dag_id='sample',
schedule_interval=None, # executes at 6 AM UTC every day
start_date=datetime(2021, 1, 1),
default_args=default_args,
catchup=False
) as dag:
get_dag_var = PythonOperator(
task_id='get_dag_id',
provide_context=True,
python_callable=get_dag_var)
spark_submit = SparkSubmitOperator(application="abc".....
..
application_args = [dag_var])
failure_notification = EmailOperator(
task_id = "failure_notification ",
to='abc@gmail.com',
subject='Workflow Failes',
trigger_rule="one_failed",
html_content= f""" <h3>Failure Mail - {dag_var}</h3> """
)
get_dag_var >> spark_submit >> failure_notification
感谢任何帮助。谢谢。
您可以使用 XComs 在操作员之间共享数据。在您的 get_dag_var
函数中,任何 returned 值都会自动存储为 Airflow 中的 XCom 记录。您可以检查 Admin -> XComs 下的值。
要在以下任务中使用 XCom 值,您可以应用模板:
spark_submit = SparkSubmitOperator(
application="ABC",
...,
application_args = ["{{ ti.xcom_pull(task_ids='get_dag_id') }}"],
)
{{ }}
定义了一个在运行时评估的模板化字符串。 ti.xcom_pull
将在运行时从 get_dag_id
任务“拉取”XCom 值。
使用模板需要注意一件事:并非所有运算符的参数都是 template-able。非 template-able 参数不会在运行时评估 {{ }}
。 SparkSubmitOperator.application_args
和 EmailOperator.html_content
是 template-able,这意味着在运行时评估模板化字符串,您将能够提供 XCom 值。检查 template_fields
属性 以便您的操作员了解哪些字段是 template-able 哪些不是。
使用 XComs 需要注意一件事:注意 XCom 值存储在 Airflow Metastore 中,因此请注意不要 return 可能不适合数据库记录的巨大变量。要将 XCom 值存储在与 Airflow Metastore 不同的系统中,请查看 custom XCom backends.
我需要在 python 运算符中计算一个值并在其他运算符中使用它,如下所示。但是我收到“dag_var 不存在”的火花提交和电子邮件运算符/
我将 dag_var 声明为 python 可调用对象中的全局变量。但是我无法在其他运营商中访问它。
def get_dag_var(ds, **kwargs):
global dag_var
dag_var = kwargs['dag_run'].run_id
with DAG(
dag_id='sample',
schedule_interval=None, # executes at 6 AM UTC every day
start_date=datetime(2021, 1, 1),
default_args=default_args,
catchup=False
) as dag:
get_dag_var = PythonOperator(
task_id='get_dag_id',
provide_context=True,
python_callable=get_dag_var)
spark_submit = SparkSubmitOperator(application="abc".....
..
application_args = [dag_var])
failure_notification = EmailOperator(
task_id = "failure_notification ",
to='abc@gmail.com',
subject='Workflow Failes',
trigger_rule="one_failed",
html_content= f""" <h3>Failure Mail - {dag_var}</h3> """
)
get_dag_var >> spark_submit >> failure_notification
感谢任何帮助。谢谢。
您可以使用 XComs 在操作员之间共享数据。在您的 get_dag_var
函数中,任何 returned 值都会自动存储为 Airflow 中的 XCom 记录。您可以检查 Admin -> XComs 下的值。
要在以下任务中使用 XCom 值,您可以应用模板:
spark_submit = SparkSubmitOperator(
application="ABC",
...,
application_args = ["{{ ti.xcom_pull(task_ids='get_dag_id') }}"],
)
{{ }}
定义了一个在运行时评估的模板化字符串。 ti.xcom_pull
将在运行时从 get_dag_id
任务“拉取”XCom 值。
使用模板需要注意一件事:并非所有运算符的参数都是 template-able。非 template-able 参数不会在运行时评估 {{ }}
。 SparkSubmitOperator.application_args
和 EmailOperator.html_content
是 template-able,这意味着在运行时评估模板化字符串,您将能够提供 XCom 值。检查 template_fields
属性 以便您的操作员了解哪些字段是 template-able 哪些不是。
使用 XComs 需要注意一件事:注意 XCom 值存储在 Airflow Metastore 中,因此请注意不要 return 可能不适合数据库记录的巨大变量。要将 XCom 值存储在与 Airflow Metastore 不同的系统中,请查看 custom XCom backends.