如何从气流中停止 Dataproc 作业
How to stop Dataproc Job from airflow
在 Airflow 中,我们可以通过 DataprocSubmitJobOperator 提交数据处理作业。
我们可以在开发环境中停止 dataproc 作业,但不能通过 GCP Console 在生产环境中停止。
有没有办法,如果提供了数据处理作业 ID 作为参数,我们可以直接通过 Airflow 终止数据处理作业。
Airflow Dataproc 似乎不包含取消作业的操作员。参见 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataproc.html。也许您可以在气流社区提出功能请求。
目前此操作没有运算符,但 DataprocHook
具有 cancel_job
功能,因此您可以创建自定义运算符:
class MyDataprocCancelJobOperator(BaseOperator):
""" Starts a job cancellation request."""
template_fields: Sequence[str] = ("region", "project_id", "impersonation_chain")
def __init__(
self,
*,
job_id: str,
project_id: str,
region: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.job_id = job_id
self.project_id = project_id
self.region = region
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
def execute(self, context: 'Context'):
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
job = hook.cancel_job(
job_id=self.job_id,
project_id=self.project_id,
region=self.region,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
return job
在 Airflow 中,我们可以通过 DataprocSubmitJobOperator 提交数据处理作业。
我们可以在开发环境中停止 dataproc 作业,但不能通过 GCP Console 在生产环境中停止。
有没有办法,如果提供了数据处理作业 ID 作为参数,我们可以直接通过 Airflow 终止数据处理作业。
Airflow Dataproc 似乎不包含取消作业的操作员。参见 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataproc.html。也许您可以在气流社区提出功能请求。
目前此操作没有运算符,但 DataprocHook
具有 cancel_job
功能,因此您可以创建自定义运算符:
class MyDataprocCancelJobOperator(BaseOperator):
""" Starts a job cancellation request."""
template_fields: Sequence[str] = ("region", "project_id", "impersonation_chain")
def __init__(
self,
*,
job_id: str,
project_id: str,
region: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.job_id = job_id
self.project_id = project_id
self.region = region
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
def execute(self, context: 'Context'):
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
job = hook.cancel_job(
job_id=self.job_id,
project_id=self.project_id,
region=self.region,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
return job