将任务中的 return 值作为参数传递给另一个任务的问题

issue with passing return value from a task as an argument to another task

我有一个任务 return 是一个元组。将该元组的一个元素传递给另一个任务是行不通的。我可以传递整个元组,但不能传递来自 return 值的元素:

    from airflow.decorators import dag, task
    from pendulum import datetime
    
    
    @task
    def create():
        return 1, 2
    
    
    @task
    def consume(one):
        print('arg is', one)
    
    
    @dag(
        schedule_interval='@once',
        start_date=datetime(2022, 4, 10),
    )
    def test_dag():
        out = create()
        consume(out[0])  # does not work: the task gets None as argument
        consume(out)     # this works
    
    dag = test_dag()

TaskFlow the object returned from a TaskFlow function is actually an XComArg. These XComArgs are abstractions over the classic task_instance.xcom_pull(...) retrieval of XComs. Additionally XComArg objects implement __getitem__ 中用于指定除“return_value”(默认值)之外的 XCom 键。

所以在使用 consume(out[0]) 的情况下发生的事情是 Airflow 正在利用 XComArg 对象来检索键为 0 XCom 而不是create() 检索输出,然后 然后 第一项。幕后发生的事情是 task_instance.xcom_pull(task_ids="create", key=0).

是的,这在某种程度上是出乎意料的,并且 完全 不符合经典的 xcom_pull() 方法。 This issue 已开放以尝试实现功能对等。

与此同时,您当然可以访问整个 XComArg,就像您通过 consume(out) 显示的那样,或者您可以将 TaskFlow 函数更新为 return 字典并使用 multiple_outputs 将每个 key/value 对序列化为它们自己的 XComs.

例如:

from pendulum import datetime

from airflow.decorators import dag, task


@task(multiple_outputs=True)
def create():
    return {"one": 1, "two": 2}


@task
def consume(arg):
    print('arg is', arg)


@dag(
    schedule_interval='@once',
    start_date=datetime(2022, 4, 10),
)
def test_dag():
    out = create()
    consume(out["one"])

dag = test_dag()

create 任务创建的独立 XComs:

consume 任务日志:

旁注:如果 TaskFlow 函数也有字典 return 类型注释,则也可以推断出 multiple_outputs。这将根据 return 注释设置 multiple_outputs=True

from typing import Dict

@task
def create() -> Dict[str, int]:
    return {"one": 1, "two": 2}