如何从气流中停止 Dataproc 作业

How to stop Dataproc Job from airflow

在 Airflow 中,我们可以通过 DataprocSubmitJobOperator 提交数据处理作业。

我们可以在开发环境中停止 dataproc 作业,但不能通过 GCP Console 在生产环境中停止。

有没有办法,如果提供了数据处理作业 ID 作为参数,我们可以直接通过 Airflow 终止数据处理作业。

Airflow Dataproc 似乎不包含取消作业的操作员。参见 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataproc.html。也许您可以在气流社区提出功能请求。

目前此操作没有运算符,但 DataprocHook 具有 cancel_job 功能,因此您可以创建自定义运算符:

class MyDataprocCancelJobOperator(BaseOperator):
    """ Starts a job cancellation request."""

    template_fields: Sequence[str] = ("region", "project_id", "impersonation_chain")

    def __init__(
        self,
        *,
        job_id: str,
        project_id: str,
        region: Optional[str] = None,
        retry: Union[Retry, _MethodDefault] = DEFAULT,
        timeout: Optional[float] = None,
        metadata: Sequence[Tuple[str, str]] = (),
        gcp_conn_id: str = "google_cloud_default",
        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.job_id = job_id
        self.project_id = project_id
        self.region = region
        self.retry = retry
        self.timeout = timeout
        self.metadata = metadata
        self.gcp_conn_id = gcp_conn_id
        self.impersonation_chain = impersonation_chain

    def execute(self, context: 'Context'):
        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
        job = hook.cancel_job(
            job_id=self.job_id,
            project_id=self.project_id,
            region=self.region,
            retry=self.retry,
            timeout=self.timeout,
            metadata=self.metadata,
        )
        return job