如何在另一个任务气流中使用查询(bigquery operator)的结果

How to use the result of a query (bigquery operator) in another task-airflow

我在 google composser 中有一个项目旨在每天提交。 下面的代码就是这样做的,它工作正常。

with models.DAG('reporte_prueba',
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args) as dag:

    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))
        
    bq_audit_query = bigquery_operator.BigQueryOperator(
        task_id='bq_audit_query',
        sql=query_sql,
        use_legacy_sql=False,
        destination_dataset_table=bq_destination_table_name)

    export_audits_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
        task_id='export_audits_to_gcs',
        source_project_dataset_table=bq_destination_table_name,
        destination_cloud_storage_uris=[output_file],
        export_format='CSV')
    
    download_file = GCSToLocalFilesystemOperator(
        task_id="download_file",
        object_name='audits.csv',
        bucket='bucket-reportes',
        filename='/home/airflow/gcs/data/audits.csv',
    )
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=['aa@bb.cl'],
        subject="""Reporte de Auditorías Diarias 
        Institución: {institution_report} día {date_report}
        """.format(date_report=date,institution_report=institution),
        html_content="""
        Sres.
        <br>
        Adjunto enviamos archivo con Reporte Transacciones Diarias.
        <br>
        """,
        files=['/home/airflow/gcs/data/audits.csv'])

    delete_bq_table = bash_operator.BashOperator(
        task_id='delete_bq_table',
        bash_command='bq rm -f %s' % bq_destination_table_name,
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)


    (
        make_bq_dataset 
        >> bq_audit_query 
        >> export_audits_to_gcs 
        >> delete_bq_table
    )
    export_audits_to_gcs >> download_file >> email_summary

使用这段代码,我用我需要发送的数据创建了一个 table(后来被删除),然后我将那个 table 作为 csv 文件传递​​给存储。 然后我将 .csv 下载到本地 airflow 目录以通过邮件发送。

我的问题是,我是否可以避免创建 table 并将其存储的部分。因为我不需要它。

例如,使用BigqueryOperator执行查询,并在ariflow中访问结果,从而在本地生成csv,然后发送。

我有生成 CSV 的方法,但我最大的疑问是如何(如果可能的话)访问查询结果或将结果传递给另一个气流任务

虽然我不建议跨任务传递 sql 查询的结果,但气流中的 XComs 通常用于任务之间的通信。

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

您还需要创建一个自定义运算符来 return 查询结果,因为我“相信”BigQueryOperator 不会 return 查询结果。