Airflow 中是否有运算符可以根据 BigQuery 中的查询创建 table?
Is there an operator in Airflow to create a table from a query in BigQuery?
我正在寻找类似
的东西
CreateBQTableOperator(
query='select * from my_table',
output_table='my_other_table'
)
我正在寻找现有的运算符或此类运算符的代码。运算符应采用另一个参数来决定是否删除 table(如果 table 存在),然后再重新创建它或将查询附加到当前 table.
对于 Airflow >= 1.10,您可以使用 BigQueryInsertJobOperator
此运算符使用 JobConfigurationQuery
您可以使用 configuration
参数配置 API 支持的任何选项:
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
execute_query_save = BigQueryInsertJobOperator(
task_id="execute_query_save",
configuration={
"query": {
"query": "select * from my_table",
"useLegacySql": False,
"writeDisposition": "WRITE_EMPTY",
'destinationTable': {
'projectId': "my-project",
'datasetId': "my_data_set",
'tableId': "table2"
},
}
},
)
对于较旧的 Airflow 版本,您可以使用 BigQueryExecuteQueryOperator
:
运营商有destination_dataset_table
:
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
execute_query_save = BigQueryExecuteQueryOperator(
task_id="execute_query_save",
sql="SELECT * FROM my_data_set.table1",
use_legacy_sql=False,
destination_dataset_table="my_data_set.table2",
location="southamerica-east1",
write_disposition="WRITE_EMPTY",
create_disposition="CREATE_IF_NEEDED",
)
您可以通过设置参数值来控制请求的行为(参考 Google docs 中的值)。
write_disposition
选项是:
WRITE_TRUNCATE:如果 table 已经存在,BigQuery 会覆盖 table 数据并使用查询结果中的架构。
WRITE_APPEND:如果 table 已经存在,BigQuery 会将数据附加到 table。
WRITE_EMPTY:如果table已经存在并且包含数据,作业结果返回'duplicate'错误。
create_disposition
选项是:
CREATE_IF_NEEDED:如果 table 不存在,BigQuery 会创建 table.
CREATE_NEVER: table 必须已经存在。否则,作业结果中会返回 'notFound' 错误。
我正在寻找类似
的东西CreateBQTableOperator(
query='select * from my_table',
output_table='my_other_table'
)
我正在寻找现有的运算符或此类运算符的代码。运算符应采用另一个参数来决定是否删除 table(如果 table 存在),然后再重新创建它或将查询附加到当前 table.
对于 Airflow >= 1.10,您可以使用 BigQueryInsertJobOperator
此运算符使用 JobConfigurationQuery
您可以使用 configuration
参数配置 API 支持的任何选项:
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
execute_query_save = BigQueryInsertJobOperator(
task_id="execute_query_save",
configuration={
"query": {
"query": "select * from my_table",
"useLegacySql": False,
"writeDisposition": "WRITE_EMPTY",
'destinationTable': {
'projectId': "my-project",
'datasetId': "my_data_set",
'tableId': "table2"
},
}
},
)
对于较旧的 Airflow 版本,您可以使用 BigQueryExecuteQueryOperator
:
运营商有destination_dataset_table
:
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
execute_query_save = BigQueryExecuteQueryOperator(
task_id="execute_query_save",
sql="SELECT * FROM my_data_set.table1",
use_legacy_sql=False,
destination_dataset_table="my_data_set.table2",
location="southamerica-east1",
write_disposition="WRITE_EMPTY",
create_disposition="CREATE_IF_NEEDED",
)
您可以通过设置参数值来控制请求的行为(参考 Google docs 中的值)。
write_disposition
选项是:
WRITE_TRUNCATE:如果 table 已经存在,BigQuery 会覆盖 table 数据并使用查询结果中的架构。
WRITE_APPEND:如果 table 已经存在,BigQuery 会将数据附加到 table。
WRITE_EMPTY:如果table已经存在并且包含数据,作业结果返回'duplicate'错误。
create_disposition
选项是:
CREATE_IF_NEEDED:如果 table 不存在,BigQuery 会创建 table.
CREATE_NEVER: table 必须已经存在。否则,作业结果中会返回 'notFound' 错误。