我可以使用 spark-csv 将表示为字符串的 CSV 读取到 Apache Spark 中吗
Can I read a CSV represented as a string into Apache Spark using spark-csv
我知道如何使用 spark-csv (https://github.com/databricks/spark-csv) 将 csv 文件读入 spark,但我已经将 csv 文件表示为字符串,并且想将此字符串直接转换为数据帧。这可能吗?
您可以使用例如将字符串解析为 csv scala-csv:
val myCSVdata : Array[List[String]] =
myCSVString.split('\n').flatMap(CSVParser.parseLine(_))
在这里你可以做更多的处理,数据清理,验证每一行是否解析良好并且具有相同的字段数等......
然后您可以将其设为 RDD
条记录:
val myCSVRDD : RDD[List[String]] = sparkContext.parallelize(msCSVdata)
在这里您可以将您的字符串列表整理成一个案例 class,以更好地反映您的 csv 数据的字段。你应该从这个例子中 Person
s 的创作中得到一些灵感:
我省略了这一步。
然后您可以转换为 DataFrame:
import spark.implicits._
myCSVDataframe = myCSVRDD.toDF()
更新:从 Spark 2 开始。2.x
终于有了使用 Dataset 的正确方法。
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
"""
|id, date, timedump
|1, "2014/01/01 23:00:01",1499959917383
|2, "2014/11/31 12:40:32",1198138008843
""".stripMargin.lines.toList).toDS()
val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()
旧的 spark 版本
实际上你可以,尽管它使用的是库内部结构并且没有广泛宣传。只需创建并使用您自己的 CsvParser 实例。
适用于 spark 1.6.0 和 spark-csv_2.10-1.4.0 below
的示例
import com.databricks.spark.csv.CsvParser
val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
.withUseHeader(true)
.withInferSchema(true)
val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
接受的答案在 spark 2.2.0 中对我不起作用,但通过 csvData.lines.toList
引导我找到我需要的东西
val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString
val csvList = streamString.lines.toList
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(csvList.toDS())
.as[SomeCaseClass]
我知道如何使用 spark-csv (https://github.com/databricks/spark-csv) 将 csv 文件读入 spark,但我已经将 csv 文件表示为字符串,并且想将此字符串直接转换为数据帧。这可能吗?
您可以使用例如将字符串解析为 csv scala-csv:
val myCSVdata : Array[List[String]] =
myCSVString.split('\n').flatMap(CSVParser.parseLine(_))
在这里你可以做更多的处理,数据清理,验证每一行是否解析良好并且具有相同的字段数等......
然后您可以将其设为 RDD
条记录:
val myCSVRDD : RDD[List[String]] = sparkContext.parallelize(msCSVdata)
在这里您可以将您的字符串列表整理成一个案例 class,以更好地反映您的 csv 数据的字段。你应该从这个例子中 Person
s 的创作中得到一些灵感:
我省略了这一步。
然后您可以转换为 DataFrame:
import spark.implicits._
myCSVDataframe = myCSVRDD.toDF()
更新:从 Spark 2 开始。2.x 终于有了使用 Dataset 的正确方法。
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
"""
|id, date, timedump
|1, "2014/01/01 23:00:01",1499959917383
|2, "2014/11/31 12:40:32",1198138008843
""".stripMargin.lines.toList).toDS()
val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()
旧的 spark 版本
实际上你可以,尽管它使用的是库内部结构并且没有广泛宣传。只需创建并使用您自己的 CsvParser 实例。 适用于 spark 1.6.0 和 spark-csv_2.10-1.4.0 below
的示例 import com.databricks.spark.csv.CsvParser
val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
.withUseHeader(true)
.withInferSchema(true)
val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
接受的答案在 spark 2.2.0 中对我不起作用,但通过 csvData.lines.toList
val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString
val csvList = streamString.lines.toList
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(csvList.toDS())
.as[SomeCaseClass]