Airflow BashOperator 为 return 值并在后续任务中使用
Airflow BashOperator to return value and use in subsequent task
我正在尝试使用 BashOperator 执行带有一组参数的 python 脚本。执行的 python 脚本需要传回一些值,以便下一个任务(也是执行另一个 python 脚本的 BashOperator)可以相应地使用该值作为进程。
我在 xcom 上进行了研究,但没有发现任何可用于从 BashOperator 返回值的东西。
我是 Airflow 的新手,如有任何帮助,我们将不胜感激。
谢谢
XComs 是您用来允许任务在同一个 DAG 运行 中或跨 DAG 运行s 相互通信的工具。
XComs 上的 documentation page 是一个很好的起点。
XComs 在使用 BashOperator
时隐式设置。最后一行输出存储为 XCom,这可以在任何其他运算符中使用。
在运算符中,您会找到模板化字段。这些记录在 Airflow 文档页面和 Operator 本身的文档字符串中。
BashOperator
的 bash_command
参数是模板化字段。这意味着您可以在该领域使用 XComs。
一个 BashOperator
的输出被用作第二个下游 BashOperator
的输入的示例是:
import pendulum
from textwrap import dedent
from airflow.decorators import dag
from airflow.operators.bash_operator import BashOperator
@dag(start_date=pendulum.today(tz="Europe/London"))
def test_dag():
bash_operator_0 = BashOperator(
task_id="bash_operator_0",
bash_command=dedent(
"""
echo "hello-world"
"""
),
)
bash_operator_1 = BashOperator(
task_id="bash_operator_1",
bash_command=dedent(
"""
echo "{{ task_instance.xcom_pull(task_ids='bash_operator_0') }} from bash_operator_1"
"""
),
)
bash_operator_0 >> bash_operator_1
test_dag_failure_dag = test_dag()
bash_operator_1
的日志输出显示 bash_operator_0
的结果正在 bash_operator_1
的命令中使用:
[2022-05-26, 21:53:25 BST] {subprocess.py:74} INFO - Running command: ['bash', '-c', '\necho "hello-world from bash_operator_1"']
[2022-05-26, 21:53:25 BST] {subprocess.py:85} INFO - Output:
[2022-05-26, 21:53:25 BST] {subprocess.py:92} INFO - hello-world from bash_operator_1
[2022-05-26, 21:53:25 BST] {subprocess.py:96} INFO - Command exited with return code 0
[2022-05-26, 21:53:25 BST] {taskinstance.py:1395} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=bash_operator_1, execution_date=20220526T205323, start_date=20220526T205325, end_date=20220526T205325
我正在尝试使用 BashOperator 执行带有一组参数的 python 脚本。执行的 python 脚本需要传回一些值,以便下一个任务(也是执行另一个 python 脚本的 BashOperator)可以相应地使用该值作为进程。
我在 xcom 上进行了研究,但没有发现任何可用于从 BashOperator 返回值的东西。
我是 Airflow 的新手,如有任何帮助,我们将不胜感激。
谢谢
XComs 是您用来允许任务在同一个 DAG 运行 中或跨 DAG 运行s 相互通信的工具。
XComs 上的 documentation page 是一个很好的起点。
XComs 在使用 BashOperator
时隐式设置。最后一行输出存储为 XCom,这可以在任何其他运算符中使用。
在运算符中,您会找到模板化字段。这些记录在 Airflow 文档页面和 Operator 本身的文档字符串中。
BashOperator
的 bash_command
参数是模板化字段。这意味着您可以在该领域使用 XComs。
一个 BashOperator
的输出被用作第二个下游 BashOperator
的输入的示例是:
import pendulum
from textwrap import dedent
from airflow.decorators import dag
from airflow.operators.bash_operator import BashOperator
@dag(start_date=pendulum.today(tz="Europe/London"))
def test_dag():
bash_operator_0 = BashOperator(
task_id="bash_operator_0",
bash_command=dedent(
"""
echo "hello-world"
"""
),
)
bash_operator_1 = BashOperator(
task_id="bash_operator_1",
bash_command=dedent(
"""
echo "{{ task_instance.xcom_pull(task_ids='bash_operator_0') }} from bash_operator_1"
"""
),
)
bash_operator_0 >> bash_operator_1
test_dag_failure_dag = test_dag()
bash_operator_1
的日志输出显示 bash_operator_0
的结果正在 bash_operator_1
的命令中使用:
[2022-05-26, 21:53:25 BST] {subprocess.py:74} INFO - Running command: ['bash', '-c', '\necho "hello-world from bash_operator_1"']
[2022-05-26, 21:53:25 BST] {subprocess.py:85} INFO - Output:
[2022-05-26, 21:53:25 BST] {subprocess.py:92} INFO - hello-world from bash_operator_1
[2022-05-26, 21:53:25 BST] {subprocess.py:96} INFO - Command exited with return code 0
[2022-05-26, 21:53:25 BST] {taskinstance.py:1395} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=bash_operator_1, execution_date=20220526T205323, start_date=20220526T205325, end_date=20220526T205325