如何在其他运算符中使用 Python 运算符中声明的变量?

How to use variables declared in Python Operator in other operators?

我需要在 python 运算符中计算一个值并在其他运算符中使用它,如下所示。但是我收到“dag_var 不存在”的火花提交和电子邮件运算符/

我将 dag_var 声明为 python 可调用对象中的全局变量。但是我无法在其他运营商中访问它。

def get_dag_var(ds, **kwargs):
    global dag_var
    dag_var = kwargs['dag_run'].run_id


with DAG(
    dag_id='sample',
    schedule_interval=None, # executes at 6 AM UTC every day
    start_date=datetime(2021, 1, 1),
    default_args=default_args,
    catchup=False
) as dag:

    get_dag_var = PythonOperator(
        task_id='get_dag_id',
        provide_context=True,
        python_callable=get_dag_var)

   spark_submit = SparkSubmitOperator(application="abc".....
                                      ..
                                      application_args = [dag_var])
                                
            

    failure_notification = EmailOperator(
        task_id = "failure_notification ",
        to='abc@gmail.com',
        subject='Workflow Failes',
        trigger_rule="one_failed",
        html_content= f""" <h3>Failure Mail - {dag_var}</h3> """
    )

    get_dag_var >> spark_submit >> failure_notification 

感谢任何帮助。谢谢。

您可以使用 XComs 在操作员之间共享数据。在您的 get_dag_var 函数中,任何 returned 值都会自动存储为 Airflow 中的 XCom 记录。您可以检查 Admin -> XComs 下的值。

要在以下任务中使用 XCom 值,您可以应用模板:

spark_submit = SparkSubmitOperator(
    application="ABC",
    ...,
    application_args = ["{{ ti.xcom_pull(task_ids='get_dag_id') }}"],
)

{{ }} 定义了一个在运行时评估的模板化字符串。 ti.xcom_pull 将在运行时从 get_dag_id 任务“拉取”XCom 值。

使用模板需要注意一件事:并非所有运算符的参数都是 template-able。非 template-able 参数不会在运行时评估 {{ }}SparkSubmitOperator.application_argsEmailOperator.html_content 是 template-able,这意味着在运行时评估模板化字符串,您将能够提供 XCom 值。检查 template_fields 属性 以便您的操作员了解哪些字段是 template-able 哪些不是。

使用 XComs 需要注意一件事:注意 XCom 值存储在 Airflow Metastore 中,因此请注意不要 return 可能不适合数据库记录的巨大变量。要将 XCom 值存储在与 Airflow Metastore 不同的系统中,请查看 custom XCom backends.