将 return 值从运算符传递给 Airflow 中的后续运算符

Passing return value from operator to following operator in Airflow

我正在尝试为 GoogleCloudStorageToBigQueryOperatorsource_objects 字段提供一个字符串列表,但是使用以下代码时出现错误:

string indices must be integers, not unicode

我不知道的事情:

我想到的事情:

我想做的事:

另外,运营商的某些字段似乎使用了一种称为templated_field的功能,模板字段背后的机制是什么?不只是 PythonOperatorBashOperator 吗?

最后一个,为什么 PythonOperator 没有 return 一个 TaskInstance

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        fileName = kwargs['dag_run'].conf['fileName']
        return [fileName]

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        #source_objects=['data.csv'], 
        source_objects=get_file_name.xcom_pull(context='', task_ids='get_file_name'), 
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_EMPTY')

    bq_load.set_upstream(get_file_name)

我对 Python 和 Airflow 有点陌生。我想这些事情应该是微不足道的。我确定我在这里误解了什么。

经过多次测试,我想出了这个解决方案,感谢 tobi6 的评论为我指明了正确的方向。我不得不使用 template_fields 功能。

当我尝试 return 一个包含单个字符串的列表时,我遇到了连接错误,所以我不得不 return 我的 XCOM 中的一个字符串,并将对 XCOM 的模板调用包围起来括号使结果成为列表。

这是最终代码:

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        return kwargs['dag_run'].conf['fileName']

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        source_objects=["{{ task_instance.xcom_pull(task_ids='get_file_name') }}"],
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_APPEND')

    bq_load.set_upstream(get_file_name)