使用 Gcloud Composer DAG 到 运行 Spark 作业的困难
Difficulties in using a Gcloud Composer DAG to run a Spark job
我正在使用 Gcloud Composer,尝试创建一个 DAG 来创建 DataProc 集群,运行 是一个简单的 Spark 作业,然后拆除集群。我正在尝试 运行 Spark PI 示例作业。
我知道调用 DataProcSparkOperator 时我只能选择定义 main_jar
或 main_class
属性。当我定义 main_class
时,作业失败并显示错误:
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:239)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
当我选择定义 main_jar
属性 时,作业失败并出现错误:
Error: No main class set in JAR; please specify one with --class
Run with --help for usage help or --verbose for debug output
我对如何解决这个问题有点不知所措,因为我对 Spark 和 DataProc 都是新手。
我的DAG:
import datetime as dt
from airflow import DAG, models
from airflow.contrib.operators import dataproc_operator as dpo
from airflow.utils import trigger_rule
MAIN_JAR = 'file:///usr/lib/spark/examples/jars/spark-examples.jar'
MAIN_CLASS = 'org.apache.spark.examples.SparkPi'
CLUSTER_NAME = 'quickspark-cluster-{{ ds_nodash }}'
yesterday = dt.datetime.combine(
dt.datetime.today() - dt.timedelta(1),
dt.datetime.min.time())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': dt.timedelta(seconds=30),
'project_id': models.Variable.get('gcp_project')
}
with DAG('dataproc_spark_submit', schedule_interval='0 17 * * *',
default_args=default_dag_args) as dag:
create_dataproc_cluster = dpo.DataprocClusterCreateOperator(
project_id = default_dag_args['project_id'],
task_id = 'create_dataproc_cluster',
cluster_name = CLUSTER_NAME,
num_workers = 2,
zone = models.Variable.get('gce_zone')
)
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
#main_jar = MAIN_JAR,
main_class = MAIN_CLASS,
cluster_name = CLUSTER_NAME
)
delete_dataproc_cluster = dpo.DataprocClusterDeleteOperator(
project_id = default_dag_args['project_id'],
task_id = 'delete_dataproc_cluster',
cluster_name = CLUSTER_NAME,
trigger_rule = trigger_rule.TriggerRule.ALL_DONE
)
create_dataproc_cluster >> run_spark_job >> delete_dataproc_cluster
我将它与使用 CLI 的成功作业进行了比较,发现即使 class 正在填充 Main class or jar
字段,Jar 的路径也在 Jar files
中指定:
检查运算符我注意到还有一个 dataproc_spark_jars
parameter 与 main_class
:
不互斥
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
dataproc_spark_jars = [MAIN_JAR],
main_class = MAIN_CLASS,
cluster_name = CLUSTER_NAME
)
添加它就成功了:
我正在使用 Gcloud Composer,尝试创建一个 DAG 来创建 DataProc 集群,运行 是一个简单的 Spark 作业,然后拆除集群。我正在尝试 运行 Spark PI 示例作业。
我知道调用 DataProcSparkOperator 时我只能选择定义 main_jar
或 main_class
属性。当我定义 main_class
时,作业失败并显示错误:
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:239)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
当我选择定义 main_jar
属性 时,作业失败并出现错误:
Error: No main class set in JAR; please specify one with --class
Run with --help for usage help or --verbose for debug output
我对如何解决这个问题有点不知所措,因为我对 Spark 和 DataProc 都是新手。
我的DAG:
import datetime as dt
from airflow import DAG, models
from airflow.contrib.operators import dataproc_operator as dpo
from airflow.utils import trigger_rule
MAIN_JAR = 'file:///usr/lib/spark/examples/jars/spark-examples.jar'
MAIN_CLASS = 'org.apache.spark.examples.SparkPi'
CLUSTER_NAME = 'quickspark-cluster-{{ ds_nodash }}'
yesterday = dt.datetime.combine(
dt.datetime.today() - dt.timedelta(1),
dt.datetime.min.time())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': dt.timedelta(seconds=30),
'project_id': models.Variable.get('gcp_project')
}
with DAG('dataproc_spark_submit', schedule_interval='0 17 * * *',
default_args=default_dag_args) as dag:
create_dataproc_cluster = dpo.DataprocClusterCreateOperator(
project_id = default_dag_args['project_id'],
task_id = 'create_dataproc_cluster',
cluster_name = CLUSTER_NAME,
num_workers = 2,
zone = models.Variable.get('gce_zone')
)
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
#main_jar = MAIN_JAR,
main_class = MAIN_CLASS,
cluster_name = CLUSTER_NAME
)
delete_dataproc_cluster = dpo.DataprocClusterDeleteOperator(
project_id = default_dag_args['project_id'],
task_id = 'delete_dataproc_cluster',
cluster_name = CLUSTER_NAME,
trigger_rule = trigger_rule.TriggerRule.ALL_DONE
)
create_dataproc_cluster >> run_spark_job >> delete_dataproc_cluster
我将它与使用 CLI 的成功作业进行了比较,发现即使 class 正在填充 Main class or jar
字段,Jar 的路径也在 Jar files
中指定:
检查运算符我注意到还有一个 dataproc_spark_jars
parameter 与 main_class
:
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
dataproc_spark_jars = [MAIN_JAR],
main_class = MAIN_CLASS,
cluster_name = CLUSTER_NAME
)
添加它就成功了: