Error: Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]
Error: Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]
以下数据集比较测试失败并出现错误:
Error:(55, 38) Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]. An implicit Encoder[org.apache.spark.sql.Dataset[(String, Long)]] is needed to store org.apache.spark.sql.Dataset[(String, Long)] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
).toDF("lower(word)", "count").as[Dataset[(String, Long)]]
Error:(55, 38) not enough arguments for method as: (implicit evidence: org.apache.spark.sql.Encoder[org.apache.spark.sql.Dataset[(String, Long)]])org.apache.spark.sql.Dataset[org.apache.spark.sql.Dataset[(String, Long)]].
Unspecified value parameter evidence.
).toDF("lower(word)", "count").as[Dataset[(String, Long)]]
测试
如您所见,我尝试为 (String, Long) 创建 Kryo 编码器
class WordCountDSAppTestSpec extends FlatSpec with SparkSessionTestWrapper with DatasetComparer {
import spark.implicits._
"countWords" should "return count of each word" in {
val wordsDF = Seq(
("one", "one"),
("two", "two"),
("three Three", "three"),
("three Three", "Three"),
("", "")
).toDF("line", "word").as[LineAndWord]
implicit val tupleEncoder = org.apache.spark.sql.Encoders.kryo[(String, Long)]
val expectedDF = Seq(
("one", 1L),
("two", 1L),
("three", 2L)
).toDF("lower(word)", "count").as[Dataset[(String, Long)]]
val actualDF = WordCountDSApp.countWords(wordsDF)
assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
}
}
正在测试的 Spark 应用程序
import com.aravind.oss.Logging
import com.aravind.oss.eg.wordcount.spark.WordCountUtil.{WhitespaceRegex, getClusterCfg, getPaths, getSparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{explode, split}
object WordCountDSApp extends App with Logging {
logInfo("WordCount with Dataset API and multiple Case classes")
val paths = getPaths(args)
val cluster = getClusterCfg(args)
if (paths.size > 1) {
logInfo("More than one file to process")
}
logInfo("Path(s): " + paths)
logInfo("Cluster: " + cluster)
val spark = getSparkSession("WordCountDSApp", cluster)
import spark.implicits._
/*
case class <code>Line<code> SHOULD match the number of columns in the input file
*/
val linesDs: Dataset[Line] = spark.read
.textFile(paths: _*)
.toDF("line")
.as[Line]
logInfo("Dataset before splitting line")
linesDs.show(false)
/*
<code>toWords<code> adds additional column (word) to the output so we need a
new case class <code>LineAndWord</code> that contains two properties to represent two columns.
The names of the properties should match the name of the columns as well.
*/
val wordDs: Dataset[LineAndWord] = toWords(linesDs)
logInfo("Dataset after splitting the line into words")
wordDs.show(false)
val wordCount = countWords(wordDs)
wordCount
.orderBy($"count(1)".desc)
.show(false)
def toWords(linesDs: Dataset[Line]): Dataset[LineAndWord] = {
import linesDs.sparkSession.implicits._
linesDs
.select($"line",
explode(split($"line", WhitespaceRegex)).as("word"))
.as[LineAndWord]
}
def countWords(wordsDs: Dataset[LineAndWord]): Dataset[(String, Long)] = {
import wordsDs.sparkSession.implicits._
val result = wordsDs
.filter(_.word != null)
.filter(!_.word.isEmpty)
.groupByKey(_.word.toLowerCase)
.count()
result
}
case class Line(line: String)
case class LineAndWord(line: String, word: String)
}
你应该打电话给 as[Something]
,而不是 .as[Dataset[Something]]
。这是工作版本:
"countWords" should "return count of each word" in {
import org.apache.spark.sql.{Encoder, Encoders}
import spark.implicits._
implicit def tuple2[A1, A2](implicit e1: Encoder[A1],
e2: Encoder[A2]): Encoder[(A1, A2)] =
Encoders.tuple[A1, A2](e1, e2)
val expectedDF = Seq(("one", 1L), ("two", 1L), ("three", 2L))
.toDF("value", "count(1)")
.as[(String, Long)]
val wordsDF1 = Seq(
("one", "one"),
("two", "two"),
("three Three", "three"),
("three Three", "Three"),
("", "")
).toDF("line", "word").as[LineAndWord]
val actualDF = WordCountDSApp.countWords(wordsDF1)
actualDF.show()
expectedDF.show()
assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
}
以下数据集比较测试失败并出现错误:
Error:(55, 38) Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]. An implicit Encoder[org.apache.spark.sql.Dataset[(String, Long)]] is needed to store org.apache.spark.sql.Dataset[(String, Long)] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
).toDF("lower(word)", "count").as[Dataset[(String, Long)]]
Error:(55, 38) not enough arguments for method as: (implicit evidence: org.apache.spark.sql.Encoder[org.apache.spark.sql.Dataset[(String, Long)]])org.apache.spark.sql.Dataset[org.apache.spark.sql.Dataset[(String, Long)]].
Unspecified value parameter evidence.
).toDF("lower(word)", "count").as[Dataset[(String, Long)]]
测试
如您所见,我尝试为 (String, Long) 创建 Kryo 编码器
class WordCountDSAppTestSpec extends FlatSpec with SparkSessionTestWrapper with DatasetComparer {
import spark.implicits._
"countWords" should "return count of each word" in {
val wordsDF = Seq(
("one", "one"),
("two", "two"),
("three Three", "three"),
("three Three", "Three"),
("", "")
).toDF("line", "word").as[LineAndWord]
implicit val tupleEncoder = org.apache.spark.sql.Encoders.kryo[(String, Long)]
val expectedDF = Seq(
("one", 1L),
("two", 1L),
("three", 2L)
).toDF("lower(word)", "count").as[Dataset[(String, Long)]]
val actualDF = WordCountDSApp.countWords(wordsDF)
assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
}
}
正在测试的 Spark 应用程序
import com.aravind.oss.Logging
import com.aravind.oss.eg.wordcount.spark.WordCountUtil.{WhitespaceRegex, getClusterCfg, getPaths, getSparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{explode, split}
object WordCountDSApp extends App with Logging {
logInfo("WordCount with Dataset API and multiple Case classes")
val paths = getPaths(args)
val cluster = getClusterCfg(args)
if (paths.size > 1) {
logInfo("More than one file to process")
}
logInfo("Path(s): " + paths)
logInfo("Cluster: " + cluster)
val spark = getSparkSession("WordCountDSApp", cluster)
import spark.implicits._
/*
case class <code>Line<code> SHOULD match the number of columns in the input file
*/
val linesDs: Dataset[Line] = spark.read
.textFile(paths: _*)
.toDF("line")
.as[Line]
logInfo("Dataset before splitting line")
linesDs.show(false)
/*
<code>toWords<code> adds additional column (word) to the output so we need a
new case class <code>LineAndWord</code> that contains two properties to represent two columns.
The names of the properties should match the name of the columns as well.
*/
val wordDs: Dataset[LineAndWord] = toWords(linesDs)
logInfo("Dataset after splitting the line into words")
wordDs.show(false)
val wordCount = countWords(wordDs)
wordCount
.orderBy($"count(1)".desc)
.show(false)
def toWords(linesDs: Dataset[Line]): Dataset[LineAndWord] = {
import linesDs.sparkSession.implicits._
linesDs
.select($"line",
explode(split($"line", WhitespaceRegex)).as("word"))
.as[LineAndWord]
}
def countWords(wordsDs: Dataset[LineAndWord]): Dataset[(String, Long)] = {
import wordsDs.sparkSession.implicits._
val result = wordsDs
.filter(_.word != null)
.filter(!_.word.isEmpty)
.groupByKey(_.word.toLowerCase)
.count()
result
}
case class Line(line: String)
case class LineAndWord(line: String, word: String)
}
你应该打电话给 as[Something]
,而不是 .as[Dataset[Something]]
。这是工作版本:
"countWords" should "return count of each word" in {
import org.apache.spark.sql.{Encoder, Encoders}
import spark.implicits._
implicit def tuple2[A1, A2](implicit e1: Encoder[A1],
e2: Encoder[A2]): Encoder[(A1, A2)] =
Encoders.tuple[A1, A2](e1, e2)
val expectedDF = Seq(("one", 1L), ("two", 1L), ("three", 2L))
.toDF("value", "count(1)")
.as[(String, Long)]
val wordsDF1 = Seq(
("one", "one"),
("two", "two"),
("three Three", "three"),
("three Three", "Three"),
("", "")
).toDF("line", "word").as[LineAndWord]
val actualDF = WordCountDSApp.countWords(wordsDF1)
actualDF.show()
expectedDF.show()
assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
}