如何通过分隔符拆分 Spark RDD 的行
How to split rows of a Spark RDD by Deliminator
我正在尝试将 Spark 中的数据拆分为 Array[String]
的 RDD 形式。目前我已经将文件加载到 String
的 RDD 中。
> val csvFile = textFile("/input/spam.csv")
我想在 ,
分隔符上拆分。
这个:
val csvFile = textFile("/input/spam.csv").map(line => line.split(","))
return你RDD[Array[String]]
.
如果您需要第一列作为一个 RDD
然后使用 map
函数 return 仅来自数组的第一个索引:
val firstCol = csvFile.map(_.(0))
您应该随意使用 spark-csv library which is able to parse your file considering headers and allow you to specify the delimitor. Also, it makes a pretty good job at infering the schema. I'll let you read the documentation to discover the plenty of options。
这可能看起来像这样:
sqlContext.read.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","your delimitor")
.load(pathToFile)
请注意,这个 returns 一个 DataFrame,您可能必须使用 .rdd
函数将其转换为 rdd。
当然,您必须将包加载到驱动程序中才能运行。
// create spark session
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
// read csv
val df = spark.read
.format("csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.option("delimiter", ",")
.load("/your/csv/dir/simplecsv.csv")
// convert dataframe to rdd[row]
val rddRow = df.rdd
// print 2 rows
rddRow.take(2)
// convert df to rdd[string] for specific column
val oneColumn = df.select("colName").as[(String)].rdd
oneColumn.take(2)
// convert df to rdd[string] for multiple columns
val multiColumn = df.select("col1Name","col2Name").as[(String, String)].rdd
multiColumn.take(2)
我正在尝试将 Spark 中的数据拆分为 Array[String]
的 RDD 形式。目前我已经将文件加载到 String
的 RDD 中。
> val csvFile = textFile("/input/spam.csv")
我想在 ,
分隔符上拆分。
这个:
val csvFile = textFile("/input/spam.csv").map(line => line.split(","))
return你RDD[Array[String]]
.
如果您需要第一列作为一个 RDD
然后使用 map
函数 return 仅来自数组的第一个索引:
val firstCol = csvFile.map(_.(0))
您应该随意使用 spark-csv library which is able to parse your file considering headers and allow you to specify the delimitor. Also, it makes a pretty good job at infering the schema. I'll let you read the documentation to discover the plenty of options。
这可能看起来像这样:
sqlContext.read.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","your delimitor")
.load(pathToFile)
请注意,这个 returns 一个 DataFrame,您可能必须使用 .rdd
函数将其转换为 rdd。
当然,您必须将包加载到驱动程序中才能运行。
// create spark session
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
// read csv
val df = spark.read
.format("csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.option("delimiter", ",")
.load("/your/csv/dir/simplecsv.csv")
// convert dataframe to rdd[row]
val rddRow = df.rdd
// print 2 rows
rddRow.take(2)
// convert df to rdd[string] for specific column
val oneColumn = df.select("colName").as[(String)].rdd
oneColumn.take(2)
// convert df to rdd[string] for multiple columns
val multiColumn = df.select("col1Name","col2Name").as[(String, String)].rdd
multiColumn.take(2)