Apache Sedona (Geospark) SQL with Java: ClassNotFoundException during SQL statement

Apache Sedona (Geospark) SQL with Java: ClassNotFoundException during SQL statement

我使用 Apache Sedona 的最新快照 (1.3.2-SNAPSHOT) 在 docker 集群上使用我的 Apache Spark 3.0.1 做一些地理空间工作。

在尝试教程部分 (http://sedona.apache.org/tutorial/sql/) 中的第一个示例时,我遇到了 NoClassDefException 作为 ClassNotFoundException 的原因:

    SparkSession sparkSession = SparkSession.builder()
            .appName("de.oth.GeosparkDemoApplication")
            .master("local")
            .config("spark.serializer", KryoSerializer.class.getName())
            .config("spark.kryo.registrator", GeoSparkVizKryoRegistrator.class.getName())
            .getOrCreate();

    GeoSparkSQLRegistrator.registerAll(sparkSession);
    Dataset<Row> rawDf = sparkSession
            .read()
            .format("csv")
            .option("delimiter", "\t")
            .option("header", "false")
            .load("/spark-apps/usa-county.tsv");
    rawDf.createOrReplaceTempView("rawdf");
    rawDf.show();

    Dataset<Row> spatialDf = sparkSession.sql(
            "SELECT ST_GeomFromWKT(rawdf._c0) AS countyshape, rawdf._c1, rawdf._c2 FROM rawdf");
    spatialDf.createOrReplaceTempView("spatialdf");
    spatialDf.show();

   spatialDf.printSchema();

错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback$class
at org.apache.spark.sql.geosparksql.expressions.ST_GeomFromWKT.<init>(Constructors.scala:118)
at org.apache.spark.sql.geosparksql.expressions.ST_GeomFromWKT$.apply(Constructors.scala:117)
at org.apache.spark.sql.geosparksql.expressions.ST_GeomFromWKT$.apply(Constructors.scala:117)
at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:121)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1439)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$$anonfun$applyOrElse2.$anonfun$applyOrElse5(Analyzer.scala:1944)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$$anonfun$applyOrElse2.applyOrElse(Analyzer.scala:1944)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$$anonfun$applyOrElse2.applyOrElse(Analyzer.scala:1927)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown(QueryPlan.scala:96)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform(QueryPlan.scala:129)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions(QueryPlan.scala:134)
at scala.collection.TraversableLike.$anonfun$map(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform(QueryPlan.scala:134)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions(QueryPlan.scala:139)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:96)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:87)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply.applyOrElse(Analyzer.scala:1927)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply.applyOrElse(Analyzer.scala:1925)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp(AnalysisHelper.scala:90)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp(AnalysisHelper.scala:90)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:84)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1925)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1923)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:149)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:146)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$adapted(RuleExecutor.scala:138)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:176)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:130)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack(RuleExecutor.scala:116)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck(Analyzer.scala:154)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed(QueryExecution.scala:68)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
at de.oth.GeosparkDemoApplication.main(GeosparkDemoApplication.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 90 more

在没有 ST_GeomFromWKT-function 的情况下执行 select 但使用 SQL-only 语句一切正常。 WKT 文件格式正确,显示正确。 gradle.build 确实包含 spark 和 geospark 的所有必要依赖项。

在 build.gradle 中声明的依赖项:

    dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.json', name: 'json', version: '20200518'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.0-rc1'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.0.1'
compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-csv', version: '2.11.3'
compile group: 'org.apache.spark', name: 'spark-core_2.12', version: '3.0.1'
compile group: 'org.apache.spark', name: 'spark-sql_2.12', version: '3.0.1'
compile group: 'org.apache.spark', name: 'spark-streaming_2.12', version: '3.0.1'
compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.12', version: '3.0.1'
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.7'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.7'
compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.12', version: '3.0.1'
compile('org.apache.logging.log4j:log4j-slf4j-impl:2.7')
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
compile group: 'org.datasyslab', name: 'geospark', version: '1.3.2-SNAPSHOT'
compile group: 'org.datasyslab', name: 'geospark-sql_2.3', version: '1.3.2-SNAPSHOT'
compile group: 'org.datasyslab', name: 'geospark-viz_2.3', version: '1.3.2-SNAPSHOT'
}

GeoSpark 已移至 Apache-Sedona。根据spark版本导入依赖如下:

<dependency>
  <groupId>org.apache.sedona</groupId>
  <artifactId>sedona-python-adapter-3.0_2.12</artifactId>
  <version>1.0.1-incubating</version>
</dependency>
<dependency>
  <groupId>org.apache.sedona</groupId>
  <artifactId>sedona-viz-3.0_2.12</artifactId>
  <version>1.0.1-incubating</version>
</dependency> 


<!-- https://mvnrepository.com/artifact/org.datasyslab/geotools-wrapper -->
<dependency>
    <groupId>org.datasyslab</groupId>
    <artifactId>geotools-wrapper</artifactId>
    <version>geotools-24.1</version>
</dependency>

来源:https://sedona.apache.org/download/maven-coordinates/