将 Scala 大小写 class 转换为 PySpark 模式

Converting Scala case class to PySpark schema

给定一个简单的 Scala 案例 class 如下:

package com.foo.storage.schema   
case class Person(name: String, age: Int)

可以从案例 class 创建 Spark 模式,如下所示:

import org.apache.spark.sql._
import com.foo.storage.schema.Person  

val schema = Encoders.product[Person].schema

我想知道是否可以从 Python/PySpark 中的案例 class 访问模式。我希望做这样的事情 [Python]:

jvm = sc._jvm
py4j_class = jvm.com.foo.storage.schema.Person 
jvm.org.apache.spark.sql.Encoders.product(py4j_class)

这会引发错误 com.foo.storage.schema.Person._get_object_id does not exist in the JVMEncoders.product 是 Scala 中的泛型,我不完全确定如何使用 Py4J 指定类型。有没有办法使用案例 class 创建 PySpark 架构?

我发现没有干净/简单的方法可以使用泛型来执行此操作,也不是作为纯 Scala 函数。我最终做的是为案例 class 创建一个可以获取模式的伴随对象。

解决方案

package com.foo.storage.schema
case class Person(name: String, age: Int)
object Person {
  def getSchema = Encoders.product[Person].schema
}

可以从 Py4J 调用此函数,但会 return 一个 JavaObject。它可以用这样的辅助函数转换:

from pyspark.sql.types import StructType
import json
def java_schema_to_python(j_schema):
  json_schema = json.loads(ddl.json())
  return StructType.fromJson(json_schema)

最后,我们可以提取我们的架构:

j_schema = jvm.com.foo.storage.Person.getSchema()
java_schema_to_python(j_schema)

备选方案

我发现还有一种方法可以做到这一点,但我更喜欢第一种。您可以创建一个通用函数来推断 Scala 中参数的类型,并使用它来推断类型:

object SchemaConverter {
  def getSchemaFromType[T <: Product: TypeTag](obj: T): StructType = {
     Encoders.product[T].schema
  }
}

可以这样调用:

val schema = SchemaConverter.getSchemaFromType(Person("Joe", 42))

我不喜欢这种方法,因为它需要您创建案例的虚拟实例 class。还没有测试过,但我认为上面的函数也可以使用 Py4J 调用。