使用 PythonOperator 的列表输出迭代 Airflow2 中的另一个运算符

Use list output from a PythonOperator to iterate another operator in Airflow2

背景

所以现在我们想先从一个运算符得到一个列表,然后迭代结果和运行另一个运算符。

脚本如下:

def hello_world(ti, execution_date, **context):
    # Do sth here and generate the value final_output
    ti.xcom_push(key='whatever', value=final_output)


dag = DAG(
    "test",
    schedule_interval=None,
    start_date=datetime.datetime(2021, 5, 17),
    catchup=False,
)

with dag:
    t1 = PythonOperator(
        task_id="hello_world",
        python_callable=hello_world,
    )

    outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}"

    for x in outcome_list: 
       t2 = PythonOperator(
           task_id="test_{x}",
           python_callable=do_sth,
           op_kwargs={"input_param": x},
       )

     t1 >> t2

目前的情况是,我们成功获取了xcom变量。该列表始​​终包含 60 个元素,这不会导致任何性能问题。 但是,它作为列表的字符串返回。

为了迭代它,我们想将它转换为一个列表并传递给 运行 是 t2

中的运算符的函数

当前期

outcome_list 是通过 jinja 模板生成的,并保存为这样的 str

['user_A US', 'user_B BR' , ..... ] 

我们尝试在 DAG 中使用以下函数将 outcome_list 转换为正确的 python 字符串:

outcome_list = outcome_list.strip("[]").split(", ")

它returns错误如下

jinja2.exceptions.TemplateSyntaxError: unexpected end of template, expected ','.

当我们尝试使用 jinja 语法将输出转换为列表时

outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') | list }}"

我们在执行循环时出错,说它是不可迭代的。

这是什么问题,我们应该如何处理? 谢谢你的帮助!!

outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}" 放在运算符范围之外是行不通的,因为该字符串不会被模板化。

您正在寻找的是在 运行 时间内以 map-reduce 的方式创建任务。

对于气流 <2.3.0:

这是不可能的。您无法根据上一个任务的输出创建 task_id(s)。

对于 Airflow>=2.3.0:

添加了一项新功能AIP-42 Dynamic Task Mapping这允许根据先前任务的输出创建任务。 示例:

from airflow.decorators import task
@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]

@task
def consumer(arg):
    print(repr(arg))


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

注意:在撰写此答案时,Airflow 2.3.0 尚未发布。但是 2.3.0b1 已发布,因此您可以测试您的代码。我们预计在接下来的几周内发布正式版本。