将 Scala 大小写 class 转换为 PySpark 模式
Converting Scala case class to PySpark schema
给定一个简单的 Scala 案例 class 如下:
package com.foo.storage.schema
case class Person(name: String, age: Int)
可以从案例 class 创建 Spark 模式,如下所示:
import org.apache.spark.sql._
import com.foo.storage.schema.Person
val schema = Encoders.product[Person].schema
我想知道是否可以从 Python/PySpark 中的案例 class 访问模式。我希望做这样的事情 [Python]:
jvm = sc._jvm
py4j_class = jvm.com.foo.storage.schema.Person
jvm.org.apache.spark.sql.Encoders.product(py4j_class)
这会引发错误 com.foo.storage.schema.Person._get_object_id does not exist in the JVM
。 Encoders.product
是 Scala 中的泛型,我不完全确定如何使用 Py4J 指定类型。有没有办法使用案例 class 创建 PySpark 架构?
我发现没有干净/简单的方法可以使用泛型来执行此操作,也不是作为纯 Scala 函数。我最终做的是为案例 class 创建一个可以获取模式的伴随对象。
解决方案
package com.foo.storage.schema
case class Person(name: String, age: Int)
object Person {
def getSchema = Encoders.product[Person].schema
}
可以从 Py4J 调用此函数,但会 return 一个 JavaObject
。它可以用这样的辅助函数转换:
from pyspark.sql.types import StructType
import json
def java_schema_to_python(j_schema):
json_schema = json.loads(ddl.json())
return StructType.fromJson(json_schema)
最后,我们可以提取我们的架构:
j_schema = jvm.com.foo.storage.Person.getSchema()
java_schema_to_python(j_schema)
备选方案
我发现还有一种方法可以做到这一点,但我更喜欢第一种。您可以创建一个通用函数来推断 Scala 中参数的类型,并使用它来推断类型:
object SchemaConverter {
def getSchemaFromType[T <: Product: TypeTag](obj: T): StructType = {
Encoders.product[T].schema
}
}
可以这样调用:
val schema = SchemaConverter.getSchemaFromType(Person("Joe", 42))
我不喜欢这种方法,因为它需要您创建案例的虚拟实例 class。还没有测试过,但我认为上面的函数也可以使用 Py4J 调用。
给定一个简单的 Scala 案例 class 如下:
package com.foo.storage.schema
case class Person(name: String, age: Int)
可以从案例 class 创建 Spark 模式,如下所示:
import org.apache.spark.sql._
import com.foo.storage.schema.Person
val schema = Encoders.product[Person].schema
我想知道是否可以从 Python/PySpark 中的案例 class 访问模式。我希望做这样的事情 [Python]:
jvm = sc._jvm
py4j_class = jvm.com.foo.storage.schema.Person
jvm.org.apache.spark.sql.Encoders.product(py4j_class)
这会引发错误 com.foo.storage.schema.Person._get_object_id does not exist in the JVM
。 Encoders.product
是 Scala 中的泛型,我不完全确定如何使用 Py4J 指定类型。有没有办法使用案例 class 创建 PySpark 架构?
我发现没有干净/简单的方法可以使用泛型来执行此操作,也不是作为纯 Scala 函数。我最终做的是为案例 class 创建一个可以获取模式的伴随对象。
解决方案
package com.foo.storage.schema
case class Person(name: String, age: Int)
object Person {
def getSchema = Encoders.product[Person].schema
}
可以从 Py4J 调用此函数,但会 return 一个 JavaObject
。它可以用这样的辅助函数转换:
from pyspark.sql.types import StructType
import json
def java_schema_to_python(j_schema):
json_schema = json.loads(ddl.json())
return StructType.fromJson(json_schema)
最后,我们可以提取我们的架构:
j_schema = jvm.com.foo.storage.Person.getSchema()
java_schema_to_python(j_schema)
备选方案
我发现还有一种方法可以做到这一点,但我更喜欢第一种。您可以创建一个通用函数来推断 Scala 中参数的类型,并使用它来推断类型:
object SchemaConverter {
def getSchemaFromType[T <: Product: TypeTag](obj: T): StructType = {
Encoders.product[T].schema
}
}
可以这样调用:
val schema = SchemaConverter.getSchemaFromType(Person("Joe", 42))
我不喜欢这种方法,因为它需要您创建案例的虚拟实例 class。还没有测试过,但我认为上面的函数也可以使用 Py4J 调用。