如何在 Spark 2.X 数据集中创建自定义编码器?
How to create a custom Encoder in Spark 2.X Datasets?
Spark 数据集从 Row 移到 Encoder
的 Pojo's/primitives。 Catalyst
引擎使用 ExpressionEncoder
来转换 SQL 表达式中的列。然而,似乎没有 Encoder
的其他子类可用作我们自己的实现的模板。
这是一个在 Spark 1.X / DataFrames 中没有编译的代码示例:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] => arr
case _ => {
log.error("Unsupport value type")
null
}
}
(id, label, channels, height, width, data)
}).persist(StorageLevel.DISK_ONLY)
}
我们得到
的编译器错误
Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map(row => {
^
那么 somehow/somewhere 应该有办法
- Define/implement 我们的自定义编码器
- 在
DataFrame
上执行映射时应用它(现在是 Row
类型的数据集)
- 注册编码器以供其他自定义代码使用
我正在寻找成功执行这些步骤的代码。
您是否导入了隐式编码器?
进口spark.implicits._
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder
据我所知,自 1.6 以来并没有真正改变, 中描述的解决方案是唯一可用的选项。尽管如此,您当前的代码应该可以很好地使用产品类型的默认编码器。
要了解为什么您的代码在 1.x 中有效而在 2.0.0 中可能无效,您必须检查签名。在 1.x 中,DataFrame.map
是一种采用函数 Row => T
并将 RDD[Row]
转换为 RDD[T]
.
的方法
在 2.0.0 中 DataFrame.map
也采用类型 Row => T
的函数,但是将 Dataset[Row]
(a.k.a DataFrame
) 转换为 Dataset[T]
因此 T
需要一个 Encoder
。如果您想获得 "old" 行为,您应该明确使用 RDD
:
df.rdd.map(row => ???)
对于 Dataset[Row]
map
参见
我导入了 spark.implicits._ 其中 spark 是 SparkSession,它解决了错误并导入了自定义编码器。
此外,编写自定义编码器是一种我没有尝试过的方法。
工作解决方案:-
创建 SparkSession 并导入以下内容
进口spark.implicits._
Spark 数据集从 Row 移到 Encoder
的 Pojo's/primitives。 Catalyst
引擎使用 ExpressionEncoder
来转换 SQL 表达式中的列。然而,似乎没有 Encoder
的其他子类可用作我们自己的实现的模板。
这是一个在 Spark 1.X / DataFrames 中没有编译的代码示例:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] => arr
case _ => {
log.error("Unsupport value type")
null
}
}
(id, label, channels, height, width, data)
}).persist(StorageLevel.DISK_ONLY)
}
我们得到
的编译器错误Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map(row => {
^
那么 somehow/somewhere 应该有办法
- Define/implement 我们的自定义编码器
- 在
DataFrame
上执行映射时应用它(现在是Row
类型的数据集) - 注册编码器以供其他自定义代码使用
我正在寻找成功执行这些步骤的代码。
您是否导入了隐式编码器?
进口spark.implicits._
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder
据我所知,自 1.6 以来并没有真正改变,
要了解为什么您的代码在 1.x 中有效而在 2.0.0 中可能无效,您必须检查签名。在 1.x 中,DataFrame.map
是一种采用函数 Row => T
并将 RDD[Row]
转换为 RDD[T]
.
在 2.0.0 中 DataFrame.map
也采用类型 Row => T
的函数,但是将 Dataset[Row]
(a.k.a DataFrame
) 转换为 Dataset[T]
因此 T
需要一个 Encoder
。如果您想获得 "old" 行为,您应该明确使用 RDD
:
df.rdd.map(row => ???)
对于 Dataset[Row]
map
参见
我导入了 spark.implicits._ 其中 spark 是 SparkSession,它解决了错误并导入了自定义编码器。
此外,编写自定义编码器是一种我没有尝试过的方法。
工作解决方案:- 创建 SparkSession 并导入以下内容
进口spark.implicits._