使用 Spark 将 csv.gz 个文件转换为 Parquet

Convert csv.gz files into Parquet using Spark

我需要使用 Spark(首选 Scala)将 AWS S3 和 HDFS 中文件夹中的 csv.gz 文件转换为 Parquet 文件。数据的一列是时间戳,我只有一周的数据集。时间戳格式为:

'yyyy-MM-dd hh:mm:ss'

我想要的输出是,对于每一天,都有一个文件夹(或分区),其中包含该特定日期的 Parquet 文件。所以会有 7 个输出文件夹或分区。

我对如何做到这一点只有模糊的想法,我脑子里只有 sc.textFile。 Spark 中是否有可以转换为 Parquet 的函数?我如何在 S3 和 HDFS 中实现它?

谢谢你的帮助。

如果您查看 Spark Dataframe API, and the Spark-CSV package,这将实现您想要做的大部分工作 - 将 CSV 文件读入数据帧,然后将数据帧写成镶木地板,这样您就可以了大部分路都在那里。

您仍然需要执行一些步骤来解析时间戳并使用结果对数据进行分区。

读取 csv 文件/user/hduser/wikipedia/pageviews-by-second-tsv

"timestamp"             "site"  "requests"
"2015-03-16T00:09:55"   "mobile"        1595
"2015-03-16T00:10:39"   "mobile"        1544

以下代码使用spark2.0

import org.apache.spark.sql.types._
var wikiPageViewsBySecondsSchema = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
var wikiPageViewsBySecondsDF = spark.read.schema(wikiPageViewsBySecondsSchema).option("header", "true").option("delimiter", "\t").csv("/user/hduser/wikipedia/pageviews-by-second-tsv")

将字符串时间戳转换为时间戳

wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.withColumn("timestampTS", $"timestamp".cast("timestamp")).drop("timestamp")
or 
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.select($"timestamp".cast("timestamp"), $"site", $"requests") 

写入 parquet 文件。

wikiPageViewsBySecondsTableDF.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")

老话题,但我认为如果回答不正确,即使是老话题也很重要。

在 spark 版本 >=2 之前已经包含 csv 包,您需要将 databricks csv 包导入到您的工作中,例如“--包 com.databricks:spark-csv_2.10:1.5.0”.

csv 示例:

id,name,date
1,pete,2017-10-01 16:12
2,paul,2016-10-01 12:23
3,steve,2016-10-01 03:32
4,mary,2018-10-01 11:12 
5,ann,2018-10-02 22:12
6,rudy,2018-10-03 11:11
7,mike,2018-10-04 10:10

首先需要创建hivetable,这样spark写入的数据才能与hive schema兼容。 (在未来的版本中可能不再需要)

创建table:

create table part_parq_table (
    id int,
    name string
    )
partitioned by (date string)
stored as parquet

完成后,您可以轻松读取 csv 并将数据框保存到 table.The 第二步用 "yyyy-mm-dd" 等日期格式覆盖日期列。将为每个值创建一个文件夹,其中包含特定行。

SCALA Spark-Shell 示例:

spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") 
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

前两行是创建一个尚不存在的分区文件夹所需的配置单元配置。

var df=spark.read.format("csv").option("header","true").load("/tmp/test.csv")
df=df.withColumn("date",substring(col("date"),0,10))
df.show(false)
df.write.format("parquet").mode("append").insertInto("part_parq_table")

插入完成后可以像"select * from part_parq_table"一样直接查询table。 这些文件夹将在默认 cloudera 的 table 文件夹中创建,例如hdfs:///users/hive/warehouse/part_parq_table

希望对您有所帮助 BR