如何从 Spark 中的 CSV 文件中跳过 header?
How do I skip a header from CSV files in Spark?
假设我为 Spark 上下文提供了三个文件路径以供读取,并且每个文件在第一行中都有一个架构。我们如何跳过 headers 中的模式行?
val rdd=sc.textFile("file1,file2,file3")
现在,我们如何从这个 rdd 中跳过 header 行?
您可以单独加载每个文件,使用 file.zipWithIndex().filter(_._2 > 0)
过滤它们,然后联合所有文件 RDD。
如果文件数量太多,工会可以抛出WhosebugExeption
。
如果第一条记录中只有一行 header 行,那么最有效的过滤方法是:
rdd.mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
当然,如果有很多文件里面有很多 header 行,这就没有用了。实际上,您可以合并三个这样创建的 RDD。
您也可以只写一个 filter
,它只匹配可能是 header 的一行。这很简单,但效率较低。
Python相当于:
from itertools import islice
rdd.mapPartitionsWithIndex(
lambda idx, it: islice(it, 1, None) if idx == 0 else it
)
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header) #filter out header
或者,您可以使用 spark-csv 包(或者在 Spark 2.0 中,这或多或少地以 CSV 形式提供)。请注意,这需要每个文件上的 header(如您所愿):
schema = StructType([
StructField('lat',DoubleType(),True),
StructField('lng',DoubleType(),True)])
df = sqlContext.read.format('com.databricks.spark.csv'). \
options(header='true',
delimiter="\t",
treatEmptyValuesAsNulls=True,
mode="DROPMALFORMED").load(input_file,schema=schema)
在 Spark 2.0 中,CSV reader 内置于 Spark 中,因此您可以按如下方式轻松加载 CSV 文件:
spark.read.option("header","true").csv("filePath")
使用 PySpark 中的 filter()
方法过滤掉第一列名称以删除 header:
# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)
# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)
# Check your result
for i in filterDD.take(5) : print (i)
从 Spark 2.0 开始,您可以使用 SparkSession 将其作为单行程序完成:
val spark = SparkSession.builder.config(conf).getOrCreate()
然后正如@SandeepPurohit 所说:
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
我希望它解决了你的问题!
P.S: SparkSession 是 Spark 2.0 中引入的新入口点,可以在 spark_sql 包 [=24] 下找到=]
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\sss\*.txt",1).map{
case (fileName, stream)=>
val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
(fileName, header)
}.collect().toMap
val fileNameHeaderBr = sc.broadcast(fileNameHeader)
// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\sss\*.txt",1).mapPartitions(iter =>
if(iter.hasNext){
val firstLine = iter.next()
println(s"Comparing with firstLine $firstLine")
if(firstLine == fileNameHeaderBr.value.head._2)
new WrappedIterator(null, iter)
else
new WrappedIterator(firstLine, iter)
}
else {
iter
}
).collect().foreach(println)
class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
var isFirstIteration = true
override def hasNext: Boolean = {
if (isFirstIteration && firstLine != null){
true
}
else{
iter.hasNext
}
}
override def next(): String = {
if (isFirstIteration){
println(s"For the first time $firstLine")
isFirstIteration = false
if (firstLine != null){
firstLine
}
else{
println(s"Every time $firstLine")
iter.next()
}
}
else {
iter.next()
}
}
}
对于 python 开发人员。我用spark2.0测试过。假设您要删除前 14 行。
sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn 是 df 函数。因此,下面将不适用于上面使用的 RDD 样式。
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
在 PySpark 中,您可以使用数据框并将 header 设置为 True:
df = spark.read.csv(dataPath, header=True)
这是您传递给 read()
命令的选项:
context = new org.apache.spark.sql.SQLContext(sc)
var data = context.read.option("header","true").csv("<path>")
2018 年工作 (Spark 2.3)
Python
df = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
Scala
val myDf = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
PD1: myManualSchema 是我写的一个预定义模式,你可以跳过那部分代码
2021 年更新
相同的代码适用于 Spark 3.x
df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.csv("mycsv.csv")
假设我为 Spark 上下文提供了三个文件路径以供读取,并且每个文件在第一行中都有一个架构。我们如何跳过 headers 中的模式行?
val rdd=sc.textFile("file1,file2,file3")
现在,我们如何从这个 rdd 中跳过 header 行?
您可以单独加载每个文件,使用 file.zipWithIndex().filter(_._2 > 0)
过滤它们,然后联合所有文件 RDD。
如果文件数量太多,工会可以抛出WhosebugExeption
。
如果第一条记录中只有一行 header 行,那么最有效的过滤方法是:
rdd.mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
当然,如果有很多文件里面有很多 header 行,这就没有用了。实际上,您可以合并三个这样创建的 RDD。
您也可以只写一个 filter
,它只匹配可能是 header 的一行。这很简单,但效率较低。
Python相当于:
from itertools import islice
rdd.mapPartitionsWithIndex(
lambda idx, it: islice(it, 1, None) if idx == 0 else it
)
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header) #filter out header
或者,您可以使用 spark-csv 包(或者在 Spark 2.0 中,这或多或少地以 CSV 形式提供)。请注意,这需要每个文件上的 header(如您所愿):
schema = StructType([
StructField('lat',DoubleType(),True),
StructField('lng',DoubleType(),True)])
df = sqlContext.read.format('com.databricks.spark.csv'). \
options(header='true',
delimiter="\t",
treatEmptyValuesAsNulls=True,
mode="DROPMALFORMED").load(input_file,schema=schema)
在 Spark 2.0 中,CSV reader 内置于 Spark 中,因此您可以按如下方式轻松加载 CSV 文件:
spark.read.option("header","true").csv("filePath")
使用 PySpark 中的 filter()
方法过滤掉第一列名称以删除 header:
# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)
# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)
# Check your result
for i in filterDD.take(5) : print (i)
从 Spark 2.0 开始,您可以使用 SparkSession 将其作为单行程序完成:
val spark = SparkSession.builder.config(conf).getOrCreate()
然后正如@SandeepPurohit 所说:
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
我希望它解决了你的问题!
P.S: SparkSession 是 Spark 2.0 中引入的新入口点,可以在 spark_sql 包 [=24] 下找到=]
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\sss\*.txt",1).map{
case (fileName, stream)=>
val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
(fileName, header)
}.collect().toMap
val fileNameHeaderBr = sc.broadcast(fileNameHeader)
// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\sss\*.txt",1).mapPartitions(iter =>
if(iter.hasNext){
val firstLine = iter.next()
println(s"Comparing with firstLine $firstLine")
if(firstLine == fileNameHeaderBr.value.head._2)
new WrappedIterator(null, iter)
else
new WrappedIterator(firstLine, iter)
}
else {
iter
}
).collect().foreach(println)
class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
var isFirstIteration = true
override def hasNext: Boolean = {
if (isFirstIteration && firstLine != null){
true
}
else{
iter.hasNext
}
}
override def next(): String = {
if (isFirstIteration){
println(s"For the first time $firstLine")
isFirstIteration = false
if (firstLine != null){
firstLine
}
else{
println(s"Every time $firstLine")
iter.next()
}
}
else {
iter.next()
}
}
}
对于 python 开发人员。我用spark2.0测试过。假设您要删除前 14 行。
sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn 是 df 函数。因此,下面将不适用于上面使用的 RDD 样式。
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
在 PySpark 中,您可以使用数据框并将 header 设置为 True:
df = spark.read.csv(dataPath, header=True)
这是您传递给 read()
命令的选项:
context = new org.apache.spark.sql.SQLContext(sc)
var data = context.read.option("header","true").csv("<path>")
2018 年工作 (Spark 2.3)
Python
df = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
Scala
val myDf = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
PD1: myManualSchema 是我写的一个预定义模式,你可以跳过那部分代码
2021 年更新 相同的代码适用于 Spark 3.x
df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.csv("mycsv.csv")