Airflow xcom return 列表格式的字符串而不仅仅是字符串值?

Airflow xcom return a string in list format instead of just a string value?

我有一个 returns 字符串值的气流运算符,任务名为 'task1'。所以执行后我进入 xcom 并检查 return_value 和它只是一个字符串(下面的屏幕截图)。

xcom key/value screenshot

然后我有一个名为 task2 的运算符从 task1 的 xcom 值中获取输入,如下所示:

"{{ ti.xcom_pull(task_ids=['task1'],key='return_value')}}"

问题是它得到的值是一个转换为字符串的列表。

xcom 中的值:这只是一个字符串

xcom pull返回值(jinga模板版):['this is just a string']

那么有没有一种方法可以更新上面显示的 xcom pull(jinga 版本)以仅提取值?我无法在传递给它的运算符内部访问,或者我可以放入一些逻辑将字符串转换为列表,然后仅获取值(但这并不理想,无论如何也不是一个选项)。

此外,我认为它的工作提到我尝试做类似的事情但是使用 Python 运算符然后使用 python 代码中的内容执行 xcom pull 并且返回值就好了.所以我不确定为什么使用 Jinja 模板的 xcom pull 会这样做以及我如何解决这个问题?我希望我可以做一些我不知道的事情来轻松获得我想要的输出。有效的 python 运算符代码如下(仅供参考...)

def python_code_task3(**context): 
value = context['ti'].xcom_pull(task_ids='task1', key='return_value') 
logging.info("Value: " + value)

这段代码只是输出我想要的值这只是一个字符串

我真的只想使用 jinga 模板版本并让它检索并传入字符串。不是列表的字符串表示形式,字符串值作为列表中的一项。

在代码片段中提取 XCom 的两种方式略有不同:一种是 task_ids=["task_1"](列表参数),而另一种是 task_ids="task_1"(一个 str 参数)。

task_ids 的参数类型在使用 xcom_pull() 时很重要。 Airflow 将推断,如果您传递一个任务 ID 列表,应该有多个任务可以从中提取 XComs,并将 return 一个包含所有检索到的 XComs 的列表。否则,如果类型只是一个字符串,也就是单个任务 ID,则单个 XCom 值是 returned。这是完成此操作的 link to the code

同样值得注意的是,Jinja 模板值默认呈现为字符串。但是,对于 Airflow 2.1,您可以在 DAG 级别将名为 render_template_as_native_obj 的参数设置为 True。现在,这将在适用时将 Jinja 模板化的值呈现为本机 Python 对象(列表、字典等)。有关此概念的更多信息 here