将任务中的 return 值作为参数传递给另一个任务的问题
issue with passing return value from a task as an argument to another task
我有一个任务 return 是一个元组。将该元组的一个元素传递给另一个任务是行不通的。我可以传递整个元组,但不能传递来自 return 值的元素:
from airflow.decorators import dag, task
from pendulum import datetime
@task
def create():
return 1, 2
@task
def consume(one):
print('arg is', one)
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
out = create()
consume(out[0]) # does not work: the task gets None as argument
consume(out) # this works
dag = test_dag()
在 TaskFlow the object returned from a TaskFlow function is actually an XComArg
. These XComArgs
are abstractions over the classic task_instance.xcom_pull(...)
retrieval of XComs
. Additionally XComArg
objects implement __getitem__
中用于指定除“return_value”(默认值)之外的 XCom
键。
所以在使用 consume(out[0])
的情况下发生的事情是 Airflow 正在利用 XComArg
对象来检索键为 0 XCom
的 而不是 从 create()
检索输出,然后 然后 第一项。幕后发生的事情是 task_instance.xcom_pull(task_ids="create", key=0)
.
是的,这在某种程度上是出乎意料的,并且 完全 不符合经典的 xcom_pull()
方法。 This issue 已开放以尝试实现功能对等。
与此同时,您当然可以访问整个 XComArg
,就像您通过 consume(out)
显示的那样,或者您可以将 TaskFlow 函数更新为 return 字典并使用 multiple_outputs
将每个 key/value 对序列化为它们自己的 XComs
.
例如:
from pendulum import datetime
from airflow.decorators import dag, task
@task(multiple_outputs=True)
def create():
return {"one": 1, "two": 2}
@task
def consume(arg):
print('arg is', arg)
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
out = create()
consume(out["one"])
dag = test_dag()
从 create
任务创建的独立 XComs:
consume
任务日志:
旁注:如果 TaskFlow 函数也有字典 return 类型注释,则也可以推断出 multiple_outputs
。这将根据 return 注释设置 multiple_outputs=True
:
from typing import Dict
@task
def create() -> Dict[str, int]:
return {"one": 1, "two": 2}
我有一个任务 return 是一个元组。将该元组的一个元素传递给另一个任务是行不通的。我可以传递整个元组,但不能传递来自 return 值的元素:
from airflow.decorators import dag, task
from pendulum import datetime
@task
def create():
return 1, 2
@task
def consume(one):
print('arg is', one)
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
out = create()
consume(out[0]) # does not work: the task gets None as argument
consume(out) # this works
dag = test_dag()
在 TaskFlow the object returned from a TaskFlow function is actually an XComArg
. These XComArgs
are abstractions over the classic task_instance.xcom_pull(...)
retrieval of XComs
. Additionally XComArg
objects implement __getitem__
中用于指定除“return_value”(默认值)之外的 XCom
键。
所以在使用 consume(out[0])
的情况下发生的事情是 Airflow 正在利用 XComArg
对象来检索键为 0 XCom
的 而不是 从 create()
检索输出,然后 然后 第一项。幕后发生的事情是 task_instance.xcom_pull(task_ids="create", key=0)
.
是的,这在某种程度上是出乎意料的,并且 完全 不符合经典的 xcom_pull()
方法。 This issue 已开放以尝试实现功能对等。
与此同时,您当然可以访问整个 XComArg
,就像您通过 consume(out)
显示的那样,或者您可以将 TaskFlow 函数更新为 return 字典并使用 multiple_outputs
将每个 key/value 对序列化为它们自己的 XComs
.
例如:
from pendulum import datetime
from airflow.decorators import dag, task
@task(multiple_outputs=True)
def create():
return {"one": 1, "two": 2}
@task
def consume(arg):
print('arg is', arg)
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
out = create()
consume(out["one"])
dag = test_dag()
从 create
任务创建的独立 XComs:
consume
任务日志:
旁注:如果 TaskFlow 函数也有字典 return 类型注释,则也可以推断出 multiple_outputs
。这将根据 return 注释设置 multiple_outputs=True
:
from typing import Dict
@task
def create() -> Dict[str, int]:
return {"one": 1, "two": 2}