使用 Airflow BigqueryOperator 向 BigQuery 表添加标签
Add labels to BigQuery tables using Airflow BigqueryOperator
我必须向 bigquery table 添加标签。我知道可以通过 BigQuery UI 来完成,但是如何通过气流运算符来完成。
用例:用于计费和搜索目的。由于多个团队在同一个项目和数据集下工作,我们需要将各个团队创建的所有 table 组合在一起。由于每个团队的 table 标签不同,因此我们需要标签。
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={})
这里我想给这个目的地添加标签tabledestination_dataset_table='my_dataset.my_table'
from airflow import models, DAG
from airflow.contrib.operators import bigquery_operator, bigquery_to_gcs, bigquery_table_delete_operator
from airflow.operators.python_operator import PythonOperator
# Define Airflow DAG
with dag:
bq_query = BigQueryOperator(sql='<some query>',
destination_dataset_table='my_dataset.my_table'),
task_id='bq_query',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
labels={'project_id': 'project_name', 'dag_id': 'dag_name', 'task_id': 'task_name'}
query_params={})
编辑
注意:根据文档,BigqueryOperator 和 BigqueryCreateExternalTableOperator 的 labels
定义不同。
BigqueryOperator 的标签定义是 a dictionary containing labels for the job/query, passed to BigQuery
,一切顺利。但是我想在 table 创建时添加一个标签。
我们广泛使用 BigqueryOperator,不能使用 BigqueryCreateExternalTableOperator
是否可以使用 BigqueryOperator?或解决方法
随着你正在使用的classairflow.contrib.operators.bigquery_operator.BigQueryOperator,你可以使用参数标签。可以看到支持的参数。
labels – 包含 job/query 标签的字典,传递给 BigQuery
classairflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=, query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs)
您可以使用此 class airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator 创建带有标签
的 table
labels – 包含 table 标签的字典,传递给 BigQuery
classairflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=, query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs)
template_fields= ['bql', 'sql', 'destination_dataset_table', 'labels']
关于这两个 class,您可以看到更多 information。
编辑
你可以看到这个例子。
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
template_fields= ['dataset_id', 'table_id', 'project_id', 'gcs_schema_object', 'labels']
query_params={})
你可以看到这个example如何使用它。
我必须向 bigquery table 添加标签。我知道可以通过 BigQuery UI 来完成,但是如何通过气流运算符来完成。
用例:用于计费和搜索目的。由于多个团队在同一个项目和数据集下工作,我们需要将各个团队创建的所有 table 组合在一起。由于每个团队的 table 标签不同,因此我们需要标签。
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={})
这里我想给这个目的地添加标签tabledestination_dataset_table='my_dataset.my_table'
from airflow import models, DAG
from airflow.contrib.operators import bigquery_operator, bigquery_to_gcs, bigquery_table_delete_operator
from airflow.operators.python_operator import PythonOperator
# Define Airflow DAG
with dag:
bq_query = BigQueryOperator(sql='<some query>',
destination_dataset_table='my_dataset.my_table'),
task_id='bq_query',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
labels={'project_id': 'project_name', 'dag_id': 'dag_name', 'task_id': 'task_name'}
query_params={})
编辑
注意:根据文档,BigqueryOperator 和 BigqueryCreateExternalTableOperator 的 labels
定义不同。
BigqueryOperator 的标签定义是 a dictionary containing labels for the job/query, passed to BigQuery
,一切顺利。但是我想在 table 创建时添加一个标签。
我们广泛使用 BigqueryOperator,不能使用 BigqueryCreateExternalTableOperator
是否可以使用 BigqueryOperator?或解决方法
随着你正在使用的classairflow.contrib.operators.bigquery_operator.BigQueryOperator,你可以使用参数标签。可以看到支持的参数。
labels – 包含 job/query 标签的字典,传递给 BigQuery
classairflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=, query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs)
您可以使用此 class airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator 创建带有标签
的 tablelabels – 包含 table 标签的字典,传递给 BigQuery
classairflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=, query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs)
template_fields= ['bql', 'sql', 'destination_dataset_table', 'labels']
关于这两个 class,您可以看到更多 information。
编辑
你可以看到这个例子。
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
template_fields= ['dataset_id', 'table_id', 'project_id', 'gcs_schema_object', 'labels']
query_params={})
你可以看到这个example如何使用它。