从 CSV 文件创建 Spark 数据集

Create Spark Dataset from a CSV file

我想从一个简单的 CSV 文件创建一个 Spark 数据集。以下是 CSV 文件的内容:

name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"

这是制作数据集的代码:

var location = "s3a://path_to_csv"

case class City(name: String, state: String, number_of_people: Long)

val cities = spark.read
  .option("header", "true")
  .option("charset", "UTF8")
  .option("delimiter",",")
  .csv(location)
  .as[City]

错误信息如下:"Cannot up cast number_of_people from string to bigint as it may truncate"

Databricks 在 this blog post.

中谈论创建数据集和这个特定的错误消息

Encoders eagerly check that your data matches the expected schema, providing helpful error messages before you attempt to incorrectly process TBs of data. For example, if we try to use a datatype that is too small, such that conversion to an object would result in truncation (i.e. numStudents is larger than a byte, which holds a maximum value of 255) the Analyzer will emit an AnalysisException.

我使用的是Long类型,没想到会看到这个错误信息。

使用模式推断:

val cities = spark.read
  .option("inferSchema", "true")
  ...

或提供架构:

val cities = spark.read
  .schema(StructType(Array(StructField("name", StringType), ...)

或演员:

val cities = spark.read
  .option("header", "true")
  .csv(location)
  .withColumn("number_of_people", col("number_of_people").cast(LongType))
  .as[City]

你的情况 class case class City(name: String, state: String, number_of_people: Long), 你只需要一行

private val cityEncoder = Seq(City("", "", 0)).toDS

然后你编码

val cities = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv(location)
.as[City]

就可以了。

这是官方来源[http://spark.apache.org/docs/latest/sql-programming-guide.html#overview][1]