Airflow BashOperator 为 return 值并在后续任务中使用

Airflow BashOperator to return value and use in subsequent task

我正在尝试使用 BashOperator 执行带有一组参数的 python 脚本。执行的 python 脚本需要传回一些值,以便下一个任务(也是执行另一个 python 脚本的 BashOperator)可以相应地使用该值作为进程。

我在 xcom 上进行了研究,但没有发现任何可用于从 BashOperator 返回值的东西。

我是 Airflow 的新手,如有任何帮助,我们将不胜感激。

谢谢

XComs 是您用来允许任务在同一个 DAG 运行 中或跨 DAG 运行s 相互通信的工具。

XComs 上的 documentation page 是一个很好的起点。

XComs 在使用 BashOperator 时隐式设置。最后一行输出存储为 XCom,这可以在任何其他运算符中使用。

在运算符中,您会找到模板化字段。这些记录在 Airflow 文档页面和 Operator 本身的文档字符串中。

BashOperatorbash_command 参数是模板化字段。这意味着您可以在该领域使用 XComs。

一个 BashOperator 的输出被用作第二个下游 BashOperator 的输入的示例是:

import pendulum
from textwrap import dedent
from airflow.decorators import dag
from airflow.operators.bash_operator import BashOperator


@dag(start_date=pendulum.today(tz="Europe/London"))
def test_dag():
    bash_operator_0 = BashOperator(
        task_id="bash_operator_0",
        bash_command=dedent(
            """
            echo "hello-world"
            """
        ),
    )

    bash_operator_1 = BashOperator(
        task_id="bash_operator_1",
        bash_command=dedent(
            """
            echo "{{ task_instance.xcom_pull(task_ids='bash_operator_0') }} from bash_operator_1"
            """
        ),
    )

    bash_operator_0 >> bash_operator_1


test_dag_failure_dag = test_dag()

bash_operator_1 的日志输出显示 bash_operator_0 的结果正在 bash_operator_1 的命令中使用:

[2022-05-26, 21:53:25 BST] {subprocess.py:74} INFO - Running command: ['bash', '-c', '\necho "hello-world from bash_operator_1"']
[2022-05-26, 21:53:25 BST] {subprocess.py:85} INFO - Output:
[2022-05-26, 21:53:25 BST] {subprocess.py:92} INFO - hello-world from bash_operator_1
[2022-05-26, 21:53:25 BST] {subprocess.py:96} INFO - Command exited with return code 0
[2022-05-26, 21:53:25 BST] {taskinstance.py:1395} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=bash_operator_1, execution_date=20220526T205323, start_date=20220526T205325, end_date=20220526T205325