解压多个 *.gz 文件并在 spark scala 中制作一个 csv 文件
Unzip the multiple *.gz files and make one csv file in spark scala
我在 S3 存储桶中有多个文件,必须解压缩这些文件并将所有文件合并到一个文件 (CSV) 中,其中包含一个 header。所有文件都包含相同的 header.
数据文件如下所示。
存储系统:S3 bucket。
part-0000-XXXX.csv.gz
part_0001-YYYY.csv.gz
part-0002-ZZZZ.csv.gz
.
.
.
.
part-0010_KKKK.csv.gz.
我想要从所有文件中提取一个 CSV 文件,如上所示。请帮助我如何解压缩和合并所有文件。
将所有文件解压缩并合并成一个CSV文件后,我就可以使用这个文件与以前的文件进行数据比较..
我正在使用 spark 2.3.0 和 scala 2.11
非常感谢。
可以使用下面的代码,也可以不解压直接读取gz文件:
val filePath = "/home/harneet/<Dir where all gz/csv files are present>"
var cdnImpSchema = StructType(Array(
StructField("idate", TimestampType, true),
StructField("time", StringType, true),
StructField("anyOtherColumn", StringType, true)
))
var cdnImpDF = spark.read.format("csv"). // Use "csv" regardless of TSV or CSV.
option("delimiter", ","). // Set delimiter to tab or comma or whatever you want.
schema(cdnImpSchema). // Schema that was built above.
load(filePath)
cdnImpDF.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata.csv")
repartition(1) -> 将生成一个文件作为输出。
下面提到的代码似乎工作正常。
scala> val rdd = sc.textFile("/root/data")
rdd: org.apache.spark.rdd.RDD[String] = /root/data MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd.coalesce(1).saveAsTextFile("/root/combinedCsv", classOf[org.apache.hadoop.io.compress.GzipCodec])
您可以看到输入数据在 /root/data
目录中,gzip 格式的组合 csv 存储在 /root/combinedCsv
目录中。
更新
如果要以csv格式存储数据,去掉GzipCodec部分。
scala> rdd.coalesce(1).saveAsTextFile("/root/combinedCsv")
我在 S3 存储桶中有多个文件,必须解压缩这些文件并将所有文件合并到一个文件 (CSV) 中,其中包含一个 header。所有文件都包含相同的 header.
数据文件如下所示。
存储系统:S3 bucket。
part-0000-XXXX.csv.gz
part_0001-YYYY.csv.gz
part-0002-ZZZZ.csv.gz
.
.
.
.
part-0010_KKKK.csv.gz.
我想要从所有文件中提取一个 CSV 文件,如上所示。请帮助我如何解压缩和合并所有文件。
将所有文件解压缩并合并成一个CSV文件后,我就可以使用这个文件与以前的文件进行数据比较..
我正在使用 spark 2.3.0 和 scala 2.11
非常感谢。
可以使用下面的代码,也可以不解压直接读取gz文件:
val filePath = "/home/harneet/<Dir where all gz/csv files are present>"
var cdnImpSchema = StructType(Array(
StructField("idate", TimestampType, true),
StructField("time", StringType, true),
StructField("anyOtherColumn", StringType, true)
))
var cdnImpDF = spark.read.format("csv"). // Use "csv" regardless of TSV or CSV.
option("delimiter", ","). // Set delimiter to tab or comma or whatever you want.
schema(cdnImpSchema). // Schema that was built above.
load(filePath)
cdnImpDF.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata.csv")
repartition(1) -> 将生成一个文件作为输出。
下面提到的代码似乎工作正常。
scala> val rdd = sc.textFile("/root/data")
rdd: org.apache.spark.rdd.RDD[String] = /root/data MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd.coalesce(1).saveAsTextFile("/root/combinedCsv", classOf[org.apache.hadoop.io.compress.GzipCodec])
您可以看到输入数据在 /root/data
目录中,gzip 格式的组合 csv 存储在 /root/combinedCsv
目录中。
更新
如果要以csv格式存储数据,去掉GzipCodec部分。
scala> rdd.coalesce(1).saveAsTextFile("/root/combinedCsv")