Error: Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]

Error: Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]

以下数据集比较测试失败并出现错误:

Error:(55, 38) Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]. An implicit Encoder[org.apache.spark.sql.Dataset[(String, Long)]] is needed to store org.apache.spark.sql.Dataset[(String, Long)] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    ).toDF("lower(word)", "count").as[Dataset[(String, Long)]]
Error:(55, 38) not enough arguments for method as: (implicit evidence: org.apache.spark.sql.Encoder[org.apache.spark.sql.Dataset[(String, Long)]])org.apache.spark.sql.Dataset[org.apache.spark.sql.Dataset[(String, Long)]].
Unspecified value parameter evidence.
    ).toDF("lower(word)", "count").as[Dataset[(String, Long)]]

测试

如您所见,我尝试为 (String, Long) 创建 Kryo 编码器

class WordCountDSAppTestSpec extends FlatSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

  "countWords" should "return count of each word" in {

    val wordsDF = Seq(
      ("one", "one"),
      ("two", "two"),
      ("three Three", "three"),
      ("three Three", "Three"),
      ("", "")
    ).toDF("line", "word").as[LineAndWord]

    implicit val tupleEncoder = org.apache.spark.sql.Encoders.kryo[(String, Long)]
    val expectedDF = Seq(
      ("one", 1L),
      ("two", 1L),
      ("three", 2L)
    ).toDF("lower(word)", "count").as[Dataset[(String, Long)]]

    val actualDF = WordCountDSApp.countWords(wordsDF)

    assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
  }
}

正在测试的 Spark 应用程序

import com.aravind.oss.Logging
import com.aravind.oss.eg.wordcount.spark.WordCountUtil.{WhitespaceRegex, getClusterCfg, getPaths, getSparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{explode, split}

object WordCountDSApp extends App with Logging {
  logInfo("WordCount with Dataset API and multiple Case classes")
  val paths = getPaths(args)
  val cluster = getClusterCfg(args)

  if (paths.size > 1) {
    logInfo("More than one file to process")
  }
  logInfo("Path(s): " + paths)
  logInfo("Cluster: " + cluster)

  val spark = getSparkSession("WordCountDSApp", cluster)

  import spark.implicits._

  /*
  case class <code>Line<code> SHOULD match the number of columns in the input file
   */
  val linesDs: Dataset[Line] = spark.read
    .textFile(paths: _*)
    .toDF("line")
    .as[Line]
  logInfo("Dataset before splitting line")
  linesDs.show(false)

  /*
  <code>toWords<code> adds additional column (word) to the output so we need a
  new case class <code>LineAndWord</code> that contains two properties to represent two columns.
  The names of the properties should match the name of the columns as well.
   */
  val wordDs: Dataset[LineAndWord] = toWords(linesDs)
  logInfo("Dataset after splitting the line into words")
  wordDs.show(false)

  val wordCount = countWords(wordDs)
  wordCount
    .orderBy($"count(1)".desc)
    .show(false)

  def toWords(linesDs: Dataset[Line]): Dataset[LineAndWord] = {
    import linesDs.sparkSession.implicits._
    linesDs
      .select($"line",
        explode(split($"line", WhitespaceRegex)).as("word"))
      .as[LineAndWord]
  }

  def countWords(wordsDs: Dataset[LineAndWord]): Dataset[(String, Long)] = {
    import wordsDs.sparkSession.implicits._
    val result = wordsDs
      .filter(_.word != null)
      .filter(!_.word.isEmpty)
      .groupByKey(_.word.toLowerCase)
      .count()

    result
  }

  case class Line(line: String)

  case class LineAndWord(line: String, word: String)

}

你应该打电话给 as[Something],而不是 .as[Dataset[Something]]。这是工作版本:


"countWords" should "return count of each word" in {
  import org.apache.spark.sql.{Encoder, Encoders}
  import spark.implicits._
  implicit def tuple2[A1, A2](implicit e1: Encoder[A1],
                              e2: Encoder[A2]): Encoder[(A1, A2)] =
    Encoders.tuple[A1, A2](e1, e2)

  val expectedDF = Seq(("one", 1L), ("two", 1L), ("three", 2L))
    .toDF("value", "count(1)")
    .as[(String, Long)]

  val wordsDF1 = Seq(
    ("one", "one"),
    ("two", "two"),
    ("three Three", "three"),
    ("three Three", "Three"),
    ("", "")
  ).toDF("line", "word").as[LineAndWord]

  val actualDF = WordCountDSApp.countWords(wordsDF1)
  actualDF.show()
  expectedDF.show()

  assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
}