将 Airflow BigQueryInsertJobOperator 和 BigQueryGetDataOperator 优先级更改为批处理
Change Airflow BigQueryInsertJobOperator and BigQueryGetDataOperator Priority to Batch
我已在此处 (https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/airflow/providers/google/cloud/operators/bigquery.html#BigQueryGetDataOperator) 阅读了 BigQuery 作业运算符的 Apache Airflow 文档,但我找不到如何将作业优先级更改为批处理。怎么做到的?
BigQueryExecuteQueryOperator
有 priority 参数,可以用 INTERACTIVE
/BATCH
设置,默认为 INTERACTIVE
:
execute_insert_query = BigQueryExecuteQueryOperator(
task_id="execute_insert_query",
sql=INSERT_ROWS_QUERY,
use_legacy_sql=False,
location=location,
priority='BATCH',
)
BigQueryInsertJobOperator
没有。
我认为您可以创建一个继承自 BigQueryInsertJobOperator
的自定义运算符,并通过覆盖 _submit_job
函数来添加它:
class MyBigQueryInsertJobOperator(BigQueryInsertJobOperator):
def __init__(
self,
priority: str = 'INTERACTIVE',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.priority = priority
def _submit_job(
self,
hook: BigQueryHook,
job_id: str,
) -> BigQueryJob:
# Submit a new job
job = hook.insert_job(
configuration=self.configuration,
project_id=self.project_id,
location=self.location,
job_id=job_id,
priority=self.priority,
)
# Start the job and wait for it to complete and get the result.
job.result()
return job
虽然我没有测试,但应该可以。
我已在此处 (https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/airflow/providers/google/cloud/operators/bigquery.html#BigQueryGetDataOperator) 阅读了 BigQuery 作业运算符的 Apache Airflow 文档,但我找不到如何将作业优先级更改为批处理。怎么做到的?
BigQueryExecuteQueryOperator
有 priority 参数,可以用 INTERACTIVE
/BATCH
设置,默认为 INTERACTIVE
:
execute_insert_query = BigQueryExecuteQueryOperator(
task_id="execute_insert_query",
sql=INSERT_ROWS_QUERY,
use_legacy_sql=False,
location=location,
priority='BATCH',
)
BigQueryInsertJobOperator
没有。
我认为您可以创建一个继承自 BigQueryInsertJobOperator
的自定义运算符,并通过覆盖 _submit_job
函数来添加它:
class MyBigQueryInsertJobOperator(BigQueryInsertJobOperator):
def __init__(
self,
priority: str = 'INTERACTIVE',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.priority = priority
def _submit_job(
self,
hook: BigQueryHook,
job_id: str,
) -> BigQueryJob:
# Submit a new job
job = hook.insert_job(
configuration=self.configuration,
project_id=self.project_id,
location=self.location,
job_id=job_id,
priority=self.priority,
)
# Start the job and wait for it to complete and get the result.
job.result()
return job
虽然我没有测试,但应该可以。