Airflow 2 使用密钥名称推送 Xcom
Airflow 2 Push Xcom with Key Name
在 Airflow 2 任务流中 API 我可以使用以下代码示例轻松地在任务之间推送和拉取 XCom 值:-
@task(task_id="task_one")
def get_height() -> int:
response = requests.get("https://swapi.dev/api/people/4")
data = json.loads(response.text)
height = int(data["height"])
return height
@task(task_id="task_two")
def check_height(val):
# Show val:
print(f"Value passed in is: {val}")
check_height(get_height())
我可以看到传递给 check_height 的 val 是 202 并且包含在 xcom 默认密钥 'return_value' 中,这在某些时候没问题,但我通常更喜欢使用特定的密钥.
我的问题是如何使用命名密钥推送 XCom?这在以前使用 ti.xcom_push 非常简单,您可以只提供要将值填充到其中的键名,但我不太清楚如何在任务流中实现此目的 api工作流程。
非常感谢任何有关如何执行此操作的指示或(请简单!)示例。
您可以在装饰器中将 ti
设置为:
@task(task_id="task_one", ti)
def get_height() -> int:
response = requests.get("https://swapi.dev/api/people/4")
data = json.loads(response.text)
height = int(data["height"])
# Handle named Xcom
ti.xcom_push("my_key", height)
对于需要深层函数上下文的情况,您也可以使用 get_current_context
。我将在下面的示例中使用它只是为了展示它,但在您的情况下并不是真正需要的。
这是一个工作示例:
import json
from datetime import datetime
import requests
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
DEFAULT_ARGS = {"owner": "airflow"}
@dag(dag_id="Whosebug_dag", default_args=DEFAULT_ARGS, schedule_interval=None, start_date=datetime(2020, 2, 2))
def my_dag():
@task(task_id="task_one")
def get_height() -> int:
response = requests.get("https://swapi.dev/api/people/4")
data = json.loads(response.text)
height = int(data["height"])
# Handle named Xcom
context = get_current_context()
ti = context["ti"]
ti.xcom_push("my_key", height)
return height
@task(task_id="task_two")
def check_height(val):
# Show val:
print(f"Value passed in is: {val}")
#Read from named Xcom
context = get_current_context()
ti = context["ti"]
ti.xcom_pull("task_one")
print(f"Value passed from xcom my_key is: {val}")
check_height(get_height())
my_dag = my_dag()
正在推送两个 xcom(一个用于返回值,一个用于我们选择的键):
在下游打印两个 xcom task_two
:
在 Airflow 2 任务流中 API 我可以使用以下代码示例轻松地在任务之间推送和拉取 XCom 值:-
@task(task_id="task_one")
def get_height() -> int:
response = requests.get("https://swapi.dev/api/people/4")
data = json.loads(response.text)
height = int(data["height"])
return height
@task(task_id="task_two")
def check_height(val):
# Show val:
print(f"Value passed in is: {val}")
check_height(get_height())
我可以看到传递给 check_height 的 val 是 202 并且包含在 xcom 默认密钥 'return_value' 中,这在某些时候没问题,但我通常更喜欢使用特定的密钥.
我的问题是如何使用命名密钥推送 XCom?这在以前使用 ti.xcom_push 非常简单,您可以只提供要将值填充到其中的键名,但我不太清楚如何在任务流中实现此目的 api工作流程。
非常感谢任何有关如何执行此操作的指示或(请简单!)示例。
您可以在装饰器中将 ti
设置为:
@task(task_id="task_one", ti)
def get_height() -> int:
response = requests.get("https://swapi.dev/api/people/4")
data = json.loads(response.text)
height = int(data["height"])
# Handle named Xcom
ti.xcom_push("my_key", height)
对于需要深层函数上下文的情况,您也可以使用 get_current_context
。我将在下面的示例中使用它只是为了展示它,但在您的情况下并不是真正需要的。
这是一个工作示例:
import json
from datetime import datetime
import requests
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
DEFAULT_ARGS = {"owner": "airflow"}
@dag(dag_id="Whosebug_dag", default_args=DEFAULT_ARGS, schedule_interval=None, start_date=datetime(2020, 2, 2))
def my_dag():
@task(task_id="task_one")
def get_height() -> int:
response = requests.get("https://swapi.dev/api/people/4")
data = json.loads(response.text)
height = int(data["height"])
# Handle named Xcom
context = get_current_context()
ti = context["ti"]
ti.xcom_push("my_key", height)
return height
@task(task_id="task_two")
def check_height(val):
# Show val:
print(f"Value passed in is: {val}")
#Read from named Xcom
context = get_current_context()
ti = context["ti"]
ti.xcom_pull("task_one")
print(f"Value passed from xcom my_key is: {val}")
check_height(get_height())
my_dag = my_dag()
正在推送两个 xcom(一个用于返回值,一个用于我们选择的键):
在下游打印两个 xcom task_two
: