使用 PythonOperator 的列表输出迭代 Airflow2 中的另一个运算符
Use list output from a PythonOperator to iterate another operator in Airflow2
背景
所以现在我们想先从一个运算符得到一个列表,然后迭代结果和运行另一个运算符。
脚本如下:
def hello_world(ti, execution_date, **context):
# Do sth here and generate the value final_output
ti.xcom_push(key='whatever', value=final_output)
dag = DAG(
"test",
schedule_interval=None,
start_date=datetime.datetime(2021, 5, 17),
catchup=False,
)
with dag:
t1 = PythonOperator(
task_id="hello_world",
python_callable=hello_world,
)
outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}"
for x in outcome_list:
t2 = PythonOperator(
task_id="test_{x}",
python_callable=do_sth,
op_kwargs={"input_param": x},
)
t1 >> t2
目前的情况是,我们成功获取了xcom变量。该列表始终包含 60 个元素,这不会导致任何性能问题。
但是,它作为列表的字符串返回。
为了迭代它,我们想将它转换为一个列表并传递给 运行 是 t2
中的运算符的函数
当前期
outcome_list 是通过 jinja 模板生成的,并保存为这样的 str
['user_A US', 'user_B BR' , ..... ]
我们尝试在 DAG 中使用以下函数将 outcome_list 转换为正确的 python 字符串:
outcome_list = outcome_list.strip("[]").split(", ")
它returns错误如下
jinja2.exceptions.TemplateSyntaxError: unexpected end of template, expected ','.
当我们尝试使用 jinja 语法将输出转换为列表时
outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') | list }}"
我们在执行循环时出错,说它是不可迭代的。
这是什么问题,我们应该如何处理?
谢谢你的帮助!!
将 outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}"
放在运算符范围之外是行不通的,因为该字符串不会被模板化。
您正在寻找的是在 运行 时间内以 map-reduce 的方式创建任务。
对于气流 <2.3.0:
这是不可能的。您无法根据上一个任务的输出创建 task_id(s)。
对于 Airflow>=2.3.0:
添加了一项新功能AIP-42 Dynamic Task Mapping这允许根据先前任务的输出创建任务。
示例:
from airflow.decorators import task
@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]
@task
def consumer(arg):
print(repr(arg))
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())
注意:在撰写此答案时,Airflow 2.3.0 尚未发布。但是 2.3.0b1
已发布,因此您可以测试您的代码。我们预计在接下来的几周内发布正式版本。
背景
所以现在我们想先从一个运算符得到一个列表,然后迭代结果和运行另一个运算符。
脚本如下:
def hello_world(ti, execution_date, **context):
# Do sth here and generate the value final_output
ti.xcom_push(key='whatever', value=final_output)
dag = DAG(
"test",
schedule_interval=None,
start_date=datetime.datetime(2021, 5, 17),
catchup=False,
)
with dag:
t1 = PythonOperator(
task_id="hello_world",
python_callable=hello_world,
)
outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}"
for x in outcome_list:
t2 = PythonOperator(
task_id="test_{x}",
python_callable=do_sth,
op_kwargs={"input_param": x},
)
t1 >> t2
目前的情况是,我们成功获取了xcom变量。该列表始终包含 60 个元素,这不会导致任何性能问题。 但是,它作为列表的字符串返回。
为了迭代它,我们想将它转换为一个列表并传递给 运行 是 t2
中的运算符的函数当前期
outcome_list 是通过 jinja 模板生成的,并保存为这样的 str
['user_A US', 'user_B BR' , ..... ]
我们尝试在 DAG 中使用以下函数将 outcome_list 转换为正确的 python 字符串:
outcome_list = outcome_list.strip("[]").split(", ")
它returns错误如下
jinja2.exceptions.TemplateSyntaxError: unexpected end of template, expected ','.
当我们尝试使用 jinja 语法将输出转换为列表时
outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') | list }}"
我们在执行循环时出错,说它是不可迭代的。
这是什么问题,我们应该如何处理? 谢谢你的帮助!!
将 outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}"
放在运算符范围之外是行不通的,因为该字符串不会被模板化。
您正在寻找的是在 运行 时间内以 map-reduce 的方式创建任务。
对于气流 <2.3.0:
这是不可能的。您无法根据上一个任务的输出创建 task_id(s)。
对于 Airflow>=2.3.0:
添加了一项新功能AIP-42 Dynamic Task Mapping这允许根据先前任务的输出创建任务。 示例:
from airflow.decorators import task
@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]
@task
def consumer(arg):
print(repr(arg))
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())
注意:在撰写此答案时,Airflow 2.3.0 尚未发布。但是 2.3.0b1
已发布,因此您可以测试您的代码。我们预计在接下来的几周内发布正式版本。