如何从 H3 边界创建 PolygonRDD?

How to create a PolygonRDD from H3 boundary?

我正在将 Apache Spark 与 Apache Sedona(以前称为 GeoSpark)结合使用,并且正在尝试执行以下操作:

  1. 取一个每行包含纬度和经度的DataFrame(它来自任意来源,既不是PointRDD也不是来自特定文件格式)并将其转换为DataFrame每个点的H3索引。
  2. DataFrame 并创建一个 PolygonRDD 包含每个不同 H3 索引的 H3 单元格边界。

这是我目前拥有的:

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.sedona.core.spatialRDD.PolygonRDD
import org.apache.sedona.sql.utils.SedonaSQLRegistrator
import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
import org.locationtech.jts.geom.{Polygon, GeometryFactory, Coordinate}
import com.uber.h3core.H3Core
import com.uber.h3core.util.GeoCoord

object Main {
  def main(args: Array[String]) {
    val sparkSession: SparkSession = SparkSession
      .builder()
      .config("spark.serializer", classOf[KryoSerializer].getName)
      .config("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName)
      .master("local[*]")
      .appName("Sedona-Analysis")
      .getOrCreate()
    import sparkSession.implicits._

    SedonaSQLRegistrator.registerAll(sparkSession)
    SedonaVizRegistrator.registerAll(sparkSession)

    val df = Seq(
      (-8.01681, -34.92618),
      (-25.59306, -49.39895),
      (-7.17897, -34.86518),
      (-20.24521, -42.14273),
      (-20.24628, -42.14785),
      (-27.01641, -50.94109),
      (-19.72987, -47.94319)
    ).toDF("latitude", "longitude")

    val core: H3Core = H3Core.newInstance()
    val geoFactory = new GeometryFactory()
    val geoToH3 = udf((lat: Double, lng: Double, res: Int) => core.geoToH3(lat, lng, res))

    val trdd = df
      .select(geoToH3($"latitude", $"longitude", lit(7)).as("h3index"))
      .distinct()
      .rdd
      .map(row => {
        val h3 = row.getAs[Long](0)
        val lboundary = core.h3ToGeoBoundary(h3)
        val aboundary = lboundary.toArray(Array.ofDim[GeoCoord](lboundary.size))
        val poly = geoFactory.createPolygon(
          aboundary.map((c: GeoCoord) => new Coordinate(c.lat, c.lng))
        )
        poly.setUserData(h3)
        poly
      })
    val polyRDD = new PolygonRDD(trdd)
    polyRDD.rawSpatialRDD.foreach(println)

    sparkSession.stop()
  }
}

但是,在 运行 sbt assembly 并将输出 jar 提交到 spark-submit 之后,我收到此错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
    at org.apache.spark.rdd.RDD.$anonfun$map(RDD.scala:396)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.map(RDD.scala:395)
    at Main$.main(Main.scala:44)
    at Main.main(Main.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.uber.h3core.H3Core
Serialization stack:
    - object not serializable (class: com.uber.h3core.H3Core, value: com.uber.h3core.H3Core@3407ded1)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 2)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Main$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Main$.$anonfun$main:(Lcom/uber/h3core/H3Core;Lorg/locationtech/jts/geom/GeometryFactory;Lorg/apache/spark/sql/Row;)Lorg/locationtech/jts/geom/Polygon;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Lorg/locationtech/jts/geom/Polygon;, numCaptured=2])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class Main$$$Lambda10/0x0000000840d7f040, Main$$$Lambda10/0x0000000840d7f040@4853f592)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 22 more

实现我想要做的事情的正确方法是什么?

所以,基本上只需将 Serializable 特征添加到包含 H3Core 的对象就足够了。此外,我必须调整 Coordinate 数组以相同的点开始和结束。

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.sedona.core.spatialRDD.PolygonRDD
import org.apache.sedona.sql.utils.SedonaSQLRegistrator
import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
import org.locationtech.jts.geom.{Polygon, GeometryFactory, Coordinate}
import com.uber.h3core.H3Core
import com.uber.h3core.util.GeoCoord

object H3 extends Serializable {
  val core = H3Core.newInstance()
  val geoFactory = new GeometryFactory()
}

object Main {
  def main(args: Array[String]) {
    val sparkSession: SparkSession = SparkSession
      .builder()
      .config("spark.serializer", classOf[KryoSerializer].getName)
      .config("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName)
      .master("local[*]")
      .appName("Sedona-Analysis")
      .getOrCreate()
    import sparkSession.implicits._

    SedonaSQLRegistrator.registerAll(sparkSession)
    SedonaVizRegistrator.registerAll(sparkSession)

    val df = Seq(
      (-8.01681, -34.92618),
      (-25.59306, -49.39895),
      (-7.17897, -34.86518),
      (-20.24521, -42.14273),
      (-20.24628, -42.14785),
      (-27.01641, -50.94109),
      (-19.72987, -47.94319)
    ).toDF("latitude", "longitude")

    val geoToH3 = udf((lat: Double, lng: Double, res: Int) => H3.core.geoToH3(lat, lng, res))

    val trdd = df
      .select(geoToH3($"latitude", $"longitude", lit(7)).as("h3index"))
      .distinct()
      .rdd
      .map(row => {
        val h3 = row.getAs[Long](0)
        val lboundary = H3.core.h3ToGeoBoundary(h3)
        val aboundary = lboundary.toArray(Array.ofDim[GeoCoord](lboundary.size))
        val poly = H3.geoFactory.createPolygon({
          val ps = aboundary.map((c: GeoCoord) => new Coordinate(c.lat, c.lng))
          ps :+ ps(0)
        })
        poly.setUserData(h3)
        poly
      })
    
    val polyRDD = new PolygonRDD(trdd)
    polyRDD.rawSpatialRDD.foreach(println)

    sparkSession.stop()
  }
}