在没有 SSH 的情况下从 Dataproc 集群上的气流触发 spark 提交作业
Trigger spark submit jobs from airflow on Dataproc Cluster without SSH
目前,我正在使用 BashOperator
& BashCommand
通过 SSH 在气流中执行我的 spark-submit 命令,但是我们的客户端不允许我们通过 SSH 进入集群,是否可以执行Spark-submit
没有 SSH 的命令从 airflow 进入集群?
您可以使用 DataprocSubmitJobOperator to submit jobs in Airflow. Just make sure to pass correct parameters to the operator. Take note that the job
parameter is a dictionary based from Dataproc Job。所以你可以使用这个运算符来提交不同的作业,比如 pyspark、pig、hive 等。
下面的代码提交一个 pyspark 作业:
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PROJECT_ID = "my-project"
CLUSTER_NAME = "airflow-cluster" # name of created dataproc cluster
PYSPARK_URI = "gs://dataproc-examples/pyspark/hello-world/hello-world.py" # public sample script
REGION = "us-central1"
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'submit_dataproc_spark',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
submit_dataproc_job = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
submit_dataproc_job
气流运行:
气流日志:
Dataproc 作业:
目前,我正在使用 BashOperator
& BashCommand
通过 SSH 在气流中执行我的 spark-submit 命令,但是我们的客户端不允许我们通过 SSH 进入集群,是否可以执行Spark-submit
没有 SSH 的命令从 airflow 进入集群?
您可以使用 DataprocSubmitJobOperator to submit jobs in Airflow. Just make sure to pass correct parameters to the operator. Take note that the job
parameter is a dictionary based from Dataproc Job。所以你可以使用这个运算符来提交不同的作业,比如 pyspark、pig、hive 等。
下面的代码提交一个 pyspark 作业:
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PROJECT_ID = "my-project"
CLUSTER_NAME = "airflow-cluster" # name of created dataproc cluster
PYSPARK_URI = "gs://dataproc-examples/pyspark/hello-world/hello-world.py" # public sample script
REGION = "us-central1"
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'submit_dataproc_spark',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
submit_dataproc_job = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
submit_dataproc_job
气流运行:
气流日志:
Dataproc 作业: