将 Airflow BigQueryInsertJobOperator 和 BigQueryGetDataOperator 优先级更改为批处理

Change Airflow BigQueryInsertJobOperator and BigQueryGetDataOperator Priority to Batch

我已在此处 (https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/airflow/providers/google/cloud/operators/bigquery.html#BigQueryGetDataOperator) 阅读了 BigQuery 作业运算符的 Apache Airflow 文档,但我找不到如何将作业优先级更改为批处理。怎么做到的?

BigQueryExecuteQueryOperator priority 参数,可以用 INTERACTIVE/BATCH 设置,默认为 INTERACTIVE:

execute_insert_query = BigQueryExecuteQueryOperator(
    task_id="execute_insert_query",
    sql=INSERT_ROWS_QUERY,
    use_legacy_sql=False,
    location=location,
    priority='BATCH',
)

BigQueryInsertJobOperator没有。 我认为您可以创建一个继承自 BigQueryInsertJobOperator 的自定义运算符,并通过覆盖 _submit_job 函数来添加它:

class MyBigQueryInsertJobOperator(BigQueryInsertJobOperator):
    def __init__(
        self,
        priority: str = 'INTERACTIVE',
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.priority = priority

    def _submit_job(
        self,
        hook: BigQueryHook,
        job_id: str,
    ) -> BigQueryJob:
        # Submit a new job
        job = hook.insert_job(
            configuration=self.configuration,
            project_id=self.project_id,
            location=self.location,
            job_id=job_id,
            priority=self.priority,
        )
        # Start the job and wait for it to complete and get the result.
        job.result()
        return job

虽然我没有测试,但应该可以。