使用 spark cassandra 连接器的自定义类型转换器
Custom TypeConverters using spark cassandra connector
我使用 spark cassandra 连接器编写了一个应用程序。现在,当 spark-submit 作业时,我收到错误 java.lang.IllegalArgumentException: requirement failed: No mappable properties found in class: MailBox ,即使我定义了一个类型https://github.com/datastax/spark-cassandra-connector/blob/master/doc/6_advanced_mapper.md 中指定的转换器,我的想法是我需要一个 MailBox 的伴随对象,我在其中定义了一个映射器,但我在文档中找不到它的示例。有谁知道如何解决这个问题?谢谢
代码:
object Test {
case class Size(size: Long) {
if (size < 0) throw new IllegalArgumentException
def +(s: Size): Size = Size(size + s.size)
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = typeTag[Size]
def convertPF = { case long: Long => Size(long) }
}
object SizeToLongConverter extends TypeConverter[Long] {
def targetTypeTag = typeTag[Long]
def convertPF = { case Size(long) => long.toLong }
}
case class MailBox(id: String,totalsize: Size)
case class Id(mailboxid:String)
object StringToIdConverter extends TypeConverter[Id] {
def targetTypeTag = typeTag[Id]
def convertPF = { case str: String => Id(str)
case str: UUID => Id(str.toString) }
}
object IdToStringConverter extends TypeConverter[String] {
def targetTypeTag = typeTag[String]
def convertPF = { case Id(str) => str.toString }
}
def main(args: Array[String]) {
val sc = new SparkContext();
TypeConverter.registerConverter(StringToIdConverter)
TypeConverter.registerConverter(IdToStringConverter)
TypeConverter.registerConverter(LongToSizeConverter)
TypeConverter.registerConverter(SizeToLongConverter)
val test= sc.parallelize(Array(MailBox(Id("1"),Size(10))))
test.saveAsCassandraTable("test","Mailbox")
}
}
首先让我 post 一个快速的工作示例,然后我将说明问题所在
package com.datastax.spark.example
import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
import com.datastax.spark.connector.types._
import scala.reflect.runtime.universe._
import java.util.UUID
import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock
case class Size(size: Long) {
if (size < 0) throw new IllegalArgumentException
def +(s: Size): Size = Size(size + s.size)
}
case class MailBox(id: Id,totalsize: Size)
case class Id(mailboxid:String)
object Test {
val LongTypeTag = SparkReflectionLock.synchronized {
implicitly[TypeTag[java.lang.Long]]
}
val SizeTypeTag = SparkReflectionLock.synchronized {
typeTag[Size]
}
val IdTypeTag = SparkReflectionLock.synchronized {
typeTag[Id]
}
val StringTypeTag = SparkReflectionLock.synchronized {
implicitly[TypeTag[String]]
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = SizeTypeTag
def convertPF = { case long: Long => Size(long) }
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = SizeTypeTag
def convertPF = { case long: Long => Size(long) }
}
object SizeToLongConverter extends TypeConverter[java.lang.Long] {
def targetTypeTag = LongTypeTag
def convertPF = { case Size(long) => long.toLong }
}
object StringToIdConverter extends TypeConverter[Id] {
def targetTypeTag = IdTypeTag
def convertPF = {
case str: String => Id(str)
case str: UUID => Id(str.toString)
}
}
object IdToStringConverter extends TypeConverter[String] {
def targetTypeTag = StringTypeTag
def convertPF = { case Id(str) => str.toString }
}
TypeConverter.registerConverter(StringToIdConverter)
TypeConverter.registerConverter(IdToStringConverter)
TypeConverter.registerConverter(LongToSizeConverter)
TypeConverter.registerConverter(SizeToLongConverter)
def main(args: Array[String]) {
val sc = new SparkContext();
val test = sc.parallelize(Array(MailBox(Id("1"),Size(10))))
test.saveToCassandra("ks","mailbox")
}
}
saveAsCassandraTable 不适用于自定义类型
saveAsCassandraTable
使用需要已知类型(不是自定义类型)的 fromType 方法。这是因为 saveAsCassandraTable 根据已知的字段类型创建了一个 Cassandra 列。使用自定义类型转换器,您不会明确声明您的类型和 Cassandra 列之间的(1 到 1)映射,因此无法查找它。由于 saveAsCassandraTable 在插入之前创建了 Cassandra table,它被卡住了,因为它不知道如何制作 table.
为了解决这个问题,我们更改了行
test.saveAsCassandraTable("test","Mailbox")
至
test.saveToCassandraTable("test","Mailbox")
我们在 CQLSH 中预先制作了 table,但您也可以在应用程序中使用 Java 驱动程序来完成此操作。
我们需要转换为 Java 类型
TypeConverter 链接不适用于自定义类型转换器。这意味着我们需要提供从自定义类型到 Java 类型的转换器。为此,我更改了 SizeToLong Converter
object SizeToLongConverter extends TypeConverter[java.lang.Long] {
我们应该防范 Scala 反射缺乏线程安全
我添加了同步块(使用 SparkReflectionLock)以确保
我们不会在那里遇到任何问题。
见
SparkReflectionLock.synchronized
我们需要在对象级别进行注册
为了确保我们的注册发生在执行程序 JVM 上,我将它们移出 "main" 范围。我不确定这有多重要,但最好反映出这应该发生在代码发送到的任何地方,而不仅仅是在 main 方法期间。
我使用 spark cassandra 连接器编写了一个应用程序。现在,当 spark-submit 作业时,我收到错误 java.lang.IllegalArgumentException: requirement failed: No mappable properties found in class: MailBox ,即使我定义了一个类型https://github.com/datastax/spark-cassandra-connector/blob/master/doc/6_advanced_mapper.md 中指定的转换器,我的想法是我需要一个 MailBox 的伴随对象,我在其中定义了一个映射器,但我在文档中找不到它的示例。有谁知道如何解决这个问题?谢谢
代码:
object Test {
case class Size(size: Long) {
if (size < 0) throw new IllegalArgumentException
def +(s: Size): Size = Size(size + s.size)
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = typeTag[Size]
def convertPF = { case long: Long => Size(long) }
}
object SizeToLongConverter extends TypeConverter[Long] {
def targetTypeTag = typeTag[Long]
def convertPF = { case Size(long) => long.toLong }
}
case class MailBox(id: String,totalsize: Size)
case class Id(mailboxid:String)
object StringToIdConverter extends TypeConverter[Id] {
def targetTypeTag = typeTag[Id]
def convertPF = { case str: String => Id(str)
case str: UUID => Id(str.toString) }
}
object IdToStringConverter extends TypeConverter[String] {
def targetTypeTag = typeTag[String]
def convertPF = { case Id(str) => str.toString }
}
def main(args: Array[String]) {
val sc = new SparkContext();
TypeConverter.registerConverter(StringToIdConverter)
TypeConverter.registerConverter(IdToStringConverter)
TypeConverter.registerConverter(LongToSizeConverter)
TypeConverter.registerConverter(SizeToLongConverter)
val test= sc.parallelize(Array(MailBox(Id("1"),Size(10))))
test.saveAsCassandraTable("test","Mailbox")
}
}
首先让我 post 一个快速的工作示例,然后我将说明问题所在
package com.datastax.spark.example
import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
import com.datastax.spark.connector.types._
import scala.reflect.runtime.universe._
import java.util.UUID
import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock
case class Size(size: Long) {
if (size < 0) throw new IllegalArgumentException
def +(s: Size): Size = Size(size + s.size)
}
case class MailBox(id: Id,totalsize: Size)
case class Id(mailboxid:String)
object Test {
val LongTypeTag = SparkReflectionLock.synchronized {
implicitly[TypeTag[java.lang.Long]]
}
val SizeTypeTag = SparkReflectionLock.synchronized {
typeTag[Size]
}
val IdTypeTag = SparkReflectionLock.synchronized {
typeTag[Id]
}
val StringTypeTag = SparkReflectionLock.synchronized {
implicitly[TypeTag[String]]
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = SizeTypeTag
def convertPF = { case long: Long => Size(long) }
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = SizeTypeTag
def convertPF = { case long: Long => Size(long) }
}
object SizeToLongConverter extends TypeConverter[java.lang.Long] {
def targetTypeTag = LongTypeTag
def convertPF = { case Size(long) => long.toLong }
}
object StringToIdConverter extends TypeConverter[Id] {
def targetTypeTag = IdTypeTag
def convertPF = {
case str: String => Id(str)
case str: UUID => Id(str.toString)
}
}
object IdToStringConverter extends TypeConverter[String] {
def targetTypeTag = StringTypeTag
def convertPF = { case Id(str) => str.toString }
}
TypeConverter.registerConverter(StringToIdConverter)
TypeConverter.registerConverter(IdToStringConverter)
TypeConverter.registerConverter(LongToSizeConverter)
TypeConverter.registerConverter(SizeToLongConverter)
def main(args: Array[String]) {
val sc = new SparkContext();
val test = sc.parallelize(Array(MailBox(Id("1"),Size(10))))
test.saveToCassandra("ks","mailbox")
}
}
saveAsCassandraTable 不适用于自定义类型
saveAsCassandraTable
使用需要已知类型(不是自定义类型)的 fromType 方法。这是因为 saveAsCassandraTable 根据已知的字段类型创建了一个 Cassandra 列。使用自定义类型转换器,您不会明确声明您的类型和 Cassandra 列之间的(1 到 1)映射,因此无法查找它。由于 saveAsCassandraTable 在插入之前创建了 Cassandra table,它被卡住了,因为它不知道如何制作 table.
为了解决这个问题,我们更改了行
test.saveAsCassandraTable("test","Mailbox")
至
test.saveToCassandraTable("test","Mailbox")
我们在 CQLSH 中预先制作了 table,但您也可以在应用程序中使用 Java 驱动程序来完成此操作。
我们需要转换为 Java 类型
TypeConverter 链接不适用于自定义类型转换器。这意味着我们需要提供从自定义类型到 Java 类型的转换器。为此,我更改了 SizeToLong Converter
object SizeToLongConverter extends TypeConverter[java.lang.Long] {
我们应该防范 Scala 反射缺乏线程安全
我添加了同步块(使用 SparkReflectionLock)以确保 我们不会在那里遇到任何问题。
见
SparkReflectionLock.synchronized
我们需要在对象级别进行注册
为了确保我们的注册发生在执行程序 JVM 上,我将它们移出 "main" 范围。我不确定这有多重要,但最好反映出这应该发生在代码发送到的任何地方,而不仅仅是在 main 方法期间。