使用 Airflow BigqueryOperator 向 BigQuery 表添加标签

Add labels to BigQuery tables using Airflow BigqueryOperator

我必须向 bigquery table 添加标签。我知道可以通过 BigQuery UI 来完成,但是如何通过气流运算符来完成。

用例:用于计费和搜索目的。由于多个团队在同一个项目和数据集下工作,我们需要将各个团队创建的所有 table 组合在一起。由于每个团队的 table 标签不同,因此我们需要标签。

bq_query = BigQueryOperator(bql=sql,
                            destination_dataset_table='my_dataset.my_table'),
                            task_id='bq_query',
                            bigquery_conn_id='my_bq_connection',
                            use_legacy_sql=False,
                            write_disposition='WRITE_TRUNCATE',
                            create_disposition='CREATE_IF_NEEDED',
                            query_params={})

这里我想给这个目的地添加标签tabledestination_dataset_table='my_dataset.my_table'

我确实尝试过link中提到的:https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator.template_fields

from airflow import models, DAG
from airflow.contrib.operators import bigquery_operator, bigquery_to_gcs, bigquery_table_delete_operator
from airflow.operators.python_operator import PythonOperator



# Define Airflow DAG
with dag:

    bq_query = BigQueryOperator(sql='<some query>',
                                destination_dataset_table='my_dataset.my_table'),
                                task_id='bq_query',
                                use_legacy_sql=False,
                                write_disposition='WRITE_TRUNCATE',
                                create_disposition='CREATE_IF_NEEDED',
                                labels={'project_id': 'project_name', 'dag_id': 'dag_name', 'task_id': 'task_name'}
                                query_params={})

编辑

注意:根据文档,BigqueryOperator 和 BigqueryCreateExternalTableOperator 的 labels 定义不同。

BigqueryOperator 的标签定义是 a dictionary containing labels for the job/query, passed to BigQuery,一切顺利。但是我想在 table 创建时添加一个标签。

我们广泛使用 BigqueryOperator,不能使用 BigqueryCreateExternalTableOperator

是否可以使用 BigqueryOperator?或解决方法

随着你正在使用的classairflow.contrib.operators.bigquery_operator.BigQueryOperator,你可以使用参数标签。可以看到支持的参数。

labels – 包含 job/query 标签的字典,传递给 BigQuery

classairflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=, query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs)

您可以使用此 class airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator 创建带有标签

的 table

labels – 包含 table 标签的字典,传递给 BigQuery

classairflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=, query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs)
template_fields= ['bql', 'sql', 'destination_dataset_table', 'labels']

关于这两个 class,您可以看到更多 information

编辑

你可以看到这个例子。

   bq_query = BigQueryOperator(bql=sql,
                                destination_dataset_table='my_dataset.my_table'),
                                task_id='bq_query',
                                bigquery_conn_id='my_bq_connection',
                                use_legacy_sql=False,
                                write_disposition='WRITE_TRUNCATE',
                                create_disposition='CREATE_IF_NEEDED',
                                template_fields= ['dataset_id', 'table_id', 'project_id', 'gcs_schema_object', 'labels']
                                query_params={})

你可以看到这个example如何使用它。