Airflow 中是否有运算符可以根据 BigQuery 中的查询创建 table?

Is there an operator in Airflow to create a table from a query in BigQuery?

我正在寻找类似

的东西
CreateBQTableOperator(
    query='select * from my_table',
    output_table='my_other_table'
)

我正在寻找现有的运算符或此类运算符的代码。运算符应采用另一个参数来决定是否删除 table(如果 table 存在),然后再重新创建它或将查询附加到当前 table.

对于 Airflow >= 1.10,您可以使用 BigQueryInsertJobOperator 此运算符使用 JobConfigurationQuery 您可以使用 configuration 参数配置 API 支持的任何选项:

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

execute_query_save = BigQueryInsertJobOperator(
    task_id="execute_query_save",
    configuration={
        "query": {
            "query": "select * from my_table",
            "useLegacySql": False,
            "writeDisposition": "WRITE_EMPTY",
            'destinationTable': {
                'projectId': "my-project",
                'datasetId': "my_data_set",
                'tableId': "table2"
            },
        }
    },
)

对于较旧的 Airflow 版本,您可以使用 BigQueryExecuteQueryOperator:

运营商有destination_dataset_table:

from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator

execute_query_save = BigQueryExecuteQueryOperator(
    task_id="execute_query_save",
    sql="SELECT * FROM my_data_set.table1",
    use_legacy_sql=False,
    destination_dataset_table="my_data_set.table2",
    location="southamerica-east1",
    write_disposition="WRITE_EMPTY",
    create_disposition="CREATE_IF_NEEDED",
)

您可以通过设置参数值来控制请求的行为(参考 Google docs 中的值)。

write_disposition 选项是:

WRITE_TRUNCATE:如果 table 已经存在,BigQuery 会覆盖 table 数据并使用查询结果中的架构。

WRITE_APPEND:如果 table 已经存在,BigQuery 会将数据附加到 table。

WRITE_EMPTY:如果table已经存在并且包含数据,作业结果返回'duplicate'错误。

create_disposition 选项是:

CREATE_IF_NEEDED:如果 table 不存在,BigQuery 会创建 table.

CREATE_NEVER: table 必须已经存在。否则,作业结果中会返回 'notFound' 错误。