airflow xcom 值从动态任务 ID 到自定义运算符

airflow xcom value into custom operator from dynamic task id

我正在使用 Airflow 自定义运算符和来自任务 ID(动态生成)的 xcom 值。

如何使用 xcom pull 从动态任务 id 中获取值..

for id in ids :
    def dummy_push_function(**context):
    context['ti'].xcom_push(key='some_id', value='abc')
    
    dummy_operator = DummyOperator(
    task_id='Start',
    dag=main_dag
    )
    
    push_function_task = PythonOperator(
        task_id=f'{id}_push_function', --> some dynamic task id 
        provide_context=True,
        python_callable=dummy_push_function,
        op_kwargs={},
        dag=main_dag)
    
   
    push_function_task .set_upstream(dummy_operator)
    
    custom_task = CustomOperator(
            dag=main_dag,
            task_id='get_data',
            provide_context=True,
                url="http://www.google.com/{}".format("{{ task_instance.xcom_pull(task_ids='{id}_push_function', key='some_id') }}")

         )

    custom_task.set_upstream(push_function_task)

如何使用 jinja 语法添加动态任务任务 ID 和格式?

url="http://www.google.com/{}".format("{{ task_instance.xcom_pull(task_ids='{id}_push_function', key='some_id') }}")

问题出在 format 函数内部,您有 {id} 格式化变量需要在 f-string 中使用或后跟 .format 但两者都不是f-string 也没有 .format 函数后跟字符串。

"{{ task_instance.xcom_pull(task_ids='{id}_push_function', key='some_id') }}" 不是 f-string,因此,{id} 打印为文字“{id}”。

但是,在 format 中包含 f-string 有点复杂。我认为通过转义括号更容易使用 1 f-string。

要转义“{”,你需要一个额外的“{”,所以要转义jinja的括号“{{”,你需要“{{{{”。

url=f"http://www.google.com/{{{{ task_instance.xcom_pull(task_ids='{id}_push_function', key='some_id') }}}}"

现在,f-string 将首先被评估,然后传递给 CustomOperator,我假设 urltemplated_fields,它将使用 jinja 转换传递的字符串。

id = 1
f"http://www.google.com/{{{{ task_instance.xcom_pull(task_ids='{id}_push_function', key='some_id') }}}}"

>> # after f-string evaluated
"http://www.google.com/{{ task_instance.xcom_pull(task_ids='1_push_function', key='some_id') }}"