如何在另一个任务气流中使用查询(bigquery operator)的结果
How to use the result of a query (bigquery operator) in another task-airflow
我在 google composser 中有一个项目旨在每天提交。
下面的代码就是这样做的,它工作正常。
with models.DAG('reporte_prueba',
schedule_interval=datetime.timedelta(weeks=4),
default_args=default_dag_args) as dag:
make_bq_dataset = bash_operator.BashOperator(
task_id='make_bq_dataset',
# Executing 'bq' command requires Google Cloud SDK which comes
# preinstalled in Cloud Composer.
bash_command='bq ls {} || bq mk {}'.format(
bq_dataset_name, bq_dataset_name))
bq_audit_query = bigquery_operator.BigQueryOperator(
task_id='bq_audit_query',
sql=query_sql,
use_legacy_sql=False,
destination_dataset_table=bq_destination_table_name)
export_audits_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
task_id='export_audits_to_gcs',
source_project_dataset_table=bq_destination_table_name,
destination_cloud_storage_uris=[output_file],
export_format='CSV')
download_file = GCSToLocalFilesystemOperator(
task_id="download_file",
object_name='audits.csv',
bucket='bucket-reportes',
filename='/home/airflow/gcs/data/audits.csv',
)
email_summary = email_operator.EmailOperator(
task_id='email_summary',
to=['aa@bb.cl'],
subject="""Reporte de Auditorías Diarias
Institución: {institution_report} día {date_report}
""".format(date_report=date,institution_report=institution),
html_content="""
Sres.
<br>
Adjunto enviamos archivo con Reporte Transacciones Diarias.
<br>
""",
files=['/home/airflow/gcs/data/audits.csv'])
delete_bq_table = bash_operator.BashOperator(
task_id='delete_bq_table',
bash_command='bq rm -f %s' % bq_destination_table_name,
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
(
make_bq_dataset
>> bq_audit_query
>> export_audits_to_gcs
>> delete_bq_table
)
export_audits_to_gcs >> download_file >> email_summary
使用这段代码,我用我需要发送的数据创建了一个 table(后来被删除),然后我将那个 table 作为 csv 文件传递给存储。
然后我将 .csv 下载到本地 airflow 目录以通过邮件发送。
我的问题是,我是否可以避免创建 table 并将其存储的部分。因为我不需要它。
例如,使用BigqueryOperator执行查询,并在ariflow中访问结果,从而在本地生成csv,然后发送。
我有生成 CSV 的方法,但我最大的疑问是如何(如果可能的话)访问查询结果或将结果传递给另一个气流任务
虽然我不建议跨任务传递 sql 查询的结果,但气流中的 XComs 通常用于任务之间的通信。
https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
您还需要创建一个自定义运算符来 return 查询结果,因为我“相信”BigQueryOperator 不会 return 查询结果。
我在 google composser 中有一个项目旨在每天提交。 下面的代码就是这样做的,它工作正常。
with models.DAG('reporte_prueba',
schedule_interval=datetime.timedelta(weeks=4),
default_args=default_dag_args) as dag:
make_bq_dataset = bash_operator.BashOperator(
task_id='make_bq_dataset',
# Executing 'bq' command requires Google Cloud SDK which comes
# preinstalled in Cloud Composer.
bash_command='bq ls {} || bq mk {}'.format(
bq_dataset_name, bq_dataset_name))
bq_audit_query = bigquery_operator.BigQueryOperator(
task_id='bq_audit_query',
sql=query_sql,
use_legacy_sql=False,
destination_dataset_table=bq_destination_table_name)
export_audits_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
task_id='export_audits_to_gcs',
source_project_dataset_table=bq_destination_table_name,
destination_cloud_storage_uris=[output_file],
export_format='CSV')
download_file = GCSToLocalFilesystemOperator(
task_id="download_file",
object_name='audits.csv',
bucket='bucket-reportes',
filename='/home/airflow/gcs/data/audits.csv',
)
email_summary = email_operator.EmailOperator(
task_id='email_summary',
to=['aa@bb.cl'],
subject="""Reporte de Auditorías Diarias
Institución: {institution_report} día {date_report}
""".format(date_report=date,institution_report=institution),
html_content="""
Sres.
<br>
Adjunto enviamos archivo con Reporte Transacciones Diarias.
<br>
""",
files=['/home/airflow/gcs/data/audits.csv'])
delete_bq_table = bash_operator.BashOperator(
task_id='delete_bq_table',
bash_command='bq rm -f %s' % bq_destination_table_name,
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
(
make_bq_dataset
>> bq_audit_query
>> export_audits_to_gcs
>> delete_bq_table
)
export_audits_to_gcs >> download_file >> email_summary
使用这段代码,我用我需要发送的数据创建了一个 table(后来被删除),然后我将那个 table 作为 csv 文件传递给存储。 然后我将 .csv 下载到本地 airflow 目录以通过邮件发送。
我的问题是,我是否可以避免创建 table 并将其存储的部分。因为我不需要它。
例如,使用BigqueryOperator执行查询,并在ariflow中访问结果,从而在本地生成csv,然后发送。
我有生成 CSV 的方法,但我最大的疑问是如何(如果可能的话)访问查询结果或将结果传递给另一个气流任务
虽然我不建议跨任务传递 sql 查询的结果,但气流中的 XComs 通常用于任务之间的通信。
https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
您还需要创建一个自定义运算符来 return 查询结果,因为我“相信”BigQueryOperator 不会 return 查询结果。