从动态生成的案例加载数据集 Class
Load Dataset from Dynamically generated Case Class
需要什么:
源数据库中的表数量变化很快,因此我不想编辑大小写 classes 所以我通过 SCALA 代码动态生成它们并放入 package.但现在无法动态读取它。如果这可行,我会将“com.example.datasources.fileSystemSource.schema.{}”解析为循环
中的对象架构成员
已经完成的事情:
我有一些案例 classes 是从数据库表的模式动态生成的,如下所示:
object schema{
case class Users(name: String,
favorite_color: String,
favorite_numbers: Array[Int])
case class UserData(registration_dttm: Timestamp,
id: Int,
first_name: String,
last_name: String,
email: String,
gender: String,
ip_address: String,
cc: String,
country: String,
birthdate: String,
salary: Double,
title: String,
comments: String)
}
然后我将它们用作动态类型以读取我 Loader.scala 中的 Load[T] 函数,如下所示:
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
class Load[T <: Product: Encoder](val tableName: String,
val inputPath: String,
val spark: SparkSession,
val saveMode: String,
val outputPath: String,
val metadata: Boolean)
extends Loader[T] {
val fileSystemSourceInstance: FileSystem[T] =
new FileSystem[T](inputPath, spark, saveMode, tableName)
override def Load: Dataset[T] =
fileSystemSourceInstance.provideData(metadata, outputPath).as[T]
}
现在,通过使用 reflect.api,我可以为我的案例获取 TypeTag classes。
def stringToTypeTag[A](name: String): TypeTag[A] = {
val c = Class.forName(name)
val mirror = runtimeMirror(c.getClassLoader)
val sym = mirror.staticClass(name)
val tpe = sym.selfType
TypeTag(mirror, new api.TypeCreator {
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
if (m eq mirror) tpe.asInstanceOf[U # Type]
else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
})
}
所以,如果我现在打印我的案例 class 类型标签,我得到:
val typetagDynamic = stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users")
println(typetags)
TypeTag[com.example.datasources.fileSystemSource.schema.Users]
问题:
需要阅读这些 TypeTag 或动态生成的案例 classes,以如下方式对我的数据集进行编码:
new Load[typetagDynamic](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic]).Load
这给我错误:无法解析符号 typetagDynamic
如果这样使用:
new Load[typetagDynamic.type](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic.type]).Load
这给我错误:类型参数 [T] 不符合方法产品的类型参数范围 [T <: Product]
如果您只在运行时知道类型 schema.Users
,请尝试替换
new Load[schema.Users](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata).Load
和
import scala.reflect.runtime
import scala.reflect.runtime.universe._
val currentMirror = runtime.currentMirror
val loadType = typeOf[Load[_]]
val classSymbol = loadType.typeSymbol.asClass
val classMirror = currentMirror.reflectClass(classSymbol)
val constructorSymbol = loadType.decl(termNames.CONSTRUCTOR).asMethod
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
import scala.tools.reflect.ToolBox
val toolbox = ToolBox(currentMirror).mkToolBox()
val encoderType = appliedType(
typeOf[Encoder[_]].typeConstructor.typeSymbol,
currentMirror.staticClass("com.example.datasources.fileSystemSource.schema.Users").toType
)
val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
constructorMirror(tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata, encoderInstance).asInstanceOf[Load[_]].Load
scala.tools.reflect.ToolBoxError: implicit search has failed
您需要:
为Users
定义一个classorg.apache.spark.sql.Encoder
类型的实例在其伴随对象中(这样实例将在隐式范围内)
object Users {
implicit val usersEnc: Encoder[Users] = spark.implicits.newProductEncoder[Users]
}
或
通过 import spark.implicits._
为案例 classes 导入 Encoder
的实例,但您需要将它们导入到 [=50= 而不是当前本地范围] 本地范围,所以在这种情况下你应该替换
val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
和
val className = "com.example.datasources.fileSystemSource.schema.Users"
val classType = currentMirror.staticClass(className).toType
val encoderInstance = toolbox.eval(
q"""import path.to.spark.implicits._
import org.apache.spark.sql.Encoder
implicitly[Encoder[$classType]]""")
查看完整代码:
https://gist.github.com/DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00
需要什么:
源数据库中的表数量变化很快,因此我不想编辑大小写 classes 所以我通过 SCALA 代码动态生成它们并放入 package.但现在无法动态读取它。如果这可行,我会将“com.example.datasources.fileSystemSource.schema.{}”解析为循环
中的对象架构成员已经完成的事情:
我有一些案例 classes 是从数据库表的模式动态生成的,如下所示:
object schema{
case class Users(name: String,
favorite_color: String,
favorite_numbers: Array[Int])
case class UserData(registration_dttm: Timestamp,
id: Int,
first_name: String,
last_name: String,
email: String,
gender: String,
ip_address: String,
cc: String,
country: String,
birthdate: String,
salary: Double,
title: String,
comments: String)
}
然后我将它们用作动态类型以读取我 Loader.scala 中的 Load[T] 函数,如下所示:
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
class Load[T <: Product: Encoder](val tableName: String,
val inputPath: String,
val spark: SparkSession,
val saveMode: String,
val outputPath: String,
val metadata: Boolean)
extends Loader[T] {
val fileSystemSourceInstance: FileSystem[T] =
new FileSystem[T](inputPath, spark, saveMode, tableName)
override def Load: Dataset[T] =
fileSystemSourceInstance.provideData(metadata, outputPath).as[T]
}
现在,通过使用 reflect.api,我可以为我的案例获取 TypeTag classes。
def stringToTypeTag[A](name: String): TypeTag[A] = {
val c = Class.forName(name)
val mirror = runtimeMirror(c.getClassLoader)
val sym = mirror.staticClass(name)
val tpe = sym.selfType
TypeTag(mirror, new api.TypeCreator {
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
if (m eq mirror) tpe.asInstanceOf[U # Type]
else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
})
}
所以,如果我现在打印我的案例 class 类型标签,我得到:
val typetagDynamic = stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users")
println(typetags)
TypeTag[com.example.datasources.fileSystemSource.schema.Users]
问题:
需要阅读这些 TypeTag 或动态生成的案例 classes,以如下方式对我的数据集进行编码:
new Load[typetagDynamic](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic]).Load
这给我错误:无法解析符号 typetagDynamic
如果这样使用:
new Load[typetagDynamic.type](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic.type]).Load
这给我错误:类型参数 [T] 不符合方法产品的类型参数范围 [T <: Product]
如果您只在运行时知道类型 schema.Users
,请尝试替换
new Load[schema.Users](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata).Load
和
import scala.reflect.runtime
import scala.reflect.runtime.universe._
val currentMirror = runtime.currentMirror
val loadType = typeOf[Load[_]]
val classSymbol = loadType.typeSymbol.asClass
val classMirror = currentMirror.reflectClass(classSymbol)
val constructorSymbol = loadType.decl(termNames.CONSTRUCTOR).asMethod
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
import scala.tools.reflect.ToolBox
val toolbox = ToolBox(currentMirror).mkToolBox()
val encoderType = appliedType(
typeOf[Encoder[_]].typeConstructor.typeSymbol,
currentMirror.staticClass("com.example.datasources.fileSystemSource.schema.Users").toType
)
val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
constructorMirror(tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata, encoderInstance).asInstanceOf[Load[_]].Load
scala.tools.reflect.ToolBoxError: implicit search has failed
您需要:
为
Users
定义一个classorg.apache.spark.sql.Encoder
类型的实例在其伴随对象中(这样实例将在隐式范围内)object Users { implicit val usersEnc: Encoder[Users] = spark.implicits.newProductEncoder[Users] }
或
通过
import spark.implicits._
为案例 classes 导入Encoder
的实例,但您需要将它们导入到 [=50= 而不是当前本地范围] 本地范围,所以在这种情况下你应该替换val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false) val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
和
val className = "com.example.datasources.fileSystemSource.schema.Users" val classType = currentMirror.staticClass(className).toType val encoderInstance = toolbox.eval( q"""import path.to.spark.implicits._ import org.apache.spark.sql.Encoder implicitly[Encoder[$classType]]""")
查看完整代码: https://gist.github.com/DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00