使用 Spark 将 csv.gz 个文件转换为 Parquet
Convert csv.gz files into Parquet using Spark
我需要使用 Spark(首选 Scala)将 AWS S3 和 HDFS 中文件夹中的 csv.gz 文件转换为 Parquet 文件。数据的一列是时间戳,我只有一周的数据集。时间戳格式为:
'yyyy-MM-dd hh:mm:ss'
我想要的输出是,对于每一天,都有一个文件夹(或分区),其中包含该特定日期的 Parquet 文件。所以会有 7 个输出文件夹或分区。
我对如何做到这一点只有模糊的想法,我脑子里只有 sc.textFile。 Spark 中是否有可以转换为 Parquet 的函数?我如何在 S3 和 HDFS 中实现它?
谢谢你的帮助。
如果您查看 Spark Dataframe API, and the Spark-CSV package,这将实现您想要做的大部分工作 - 将 CSV 文件读入数据帧,然后将数据帧写成镶木地板,这样您就可以了大部分路都在那里。
您仍然需要执行一些步骤来解析时间戳并使用结果对数据进行分区。
读取 csv 文件/user/hduser/wikipedia/pageviews-by-second-tsv
"timestamp" "site" "requests"
"2015-03-16T00:09:55" "mobile" 1595
"2015-03-16T00:10:39" "mobile" 1544
以下代码使用spark2.0
import org.apache.spark.sql.types._
var wikiPageViewsBySecondsSchema = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
var wikiPageViewsBySecondsDF = spark.read.schema(wikiPageViewsBySecondsSchema).option("header", "true").option("delimiter", "\t").csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
将字符串时间戳转换为时间戳
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.withColumn("timestampTS", $"timestamp".cast("timestamp")).drop("timestamp")
or
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.select($"timestamp".cast("timestamp"), $"site", $"requests")
写入 parquet 文件。
wikiPageViewsBySecondsTableDF.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
老话题,但我认为如果回答不正确,即使是老话题也很重要。
在 spark 版本 >=2 之前已经包含 csv 包,您需要将 databricks csv 包导入到您的工作中,例如“--包 com.databricks:spark-csv_2.10:1.5.0”.
csv 示例:
id,name,date
1,pete,2017-10-01 16:12
2,paul,2016-10-01 12:23
3,steve,2016-10-01 03:32
4,mary,2018-10-01 11:12
5,ann,2018-10-02 22:12
6,rudy,2018-10-03 11:11
7,mike,2018-10-04 10:10
首先需要创建hivetable,这样spark写入的数据才能与hive schema兼容。 (在未来的版本中可能不再需要)
创建table:
create table part_parq_table (
id int,
name string
)
partitioned by (date string)
stored as parquet
完成后,您可以轻松读取 csv 并将数据框保存到 table.The 第二步用 "yyyy-mm-dd" 等日期格式覆盖日期列。将为每个值创建一个文件夹,其中包含特定行。
SCALA Spark-Shell 示例:
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
前两行是创建一个尚不存在的分区文件夹所需的配置单元配置。
var df=spark.read.format("csv").option("header","true").load("/tmp/test.csv")
df=df.withColumn("date",substring(col("date"),0,10))
df.show(false)
df.write.format("parquet").mode("append").insertInto("part_parq_table")
插入完成后可以像"select * from part_parq_table"一样直接查询table。
这些文件夹将在默认 cloudera 的 table 文件夹中创建,例如hdfs:///users/hive/warehouse/part_parq_table
希望对您有所帮助
BR
我需要使用 Spark(首选 Scala)将 AWS S3 和 HDFS 中文件夹中的 csv.gz 文件转换为 Parquet 文件。数据的一列是时间戳,我只有一周的数据集。时间戳格式为:
'yyyy-MM-dd hh:mm:ss'
我想要的输出是,对于每一天,都有一个文件夹(或分区),其中包含该特定日期的 Parquet 文件。所以会有 7 个输出文件夹或分区。
我对如何做到这一点只有模糊的想法,我脑子里只有 sc.textFile。 Spark 中是否有可以转换为 Parquet 的函数?我如何在 S3 和 HDFS 中实现它?
谢谢你的帮助。
如果您查看 Spark Dataframe API, and the Spark-CSV package,这将实现您想要做的大部分工作 - 将 CSV 文件读入数据帧,然后将数据帧写成镶木地板,这样您就可以了大部分路都在那里。
您仍然需要执行一些步骤来解析时间戳并使用结果对数据进行分区。
读取 csv 文件/user/hduser/wikipedia/pageviews-by-second-tsv
"timestamp" "site" "requests"
"2015-03-16T00:09:55" "mobile" 1595
"2015-03-16T00:10:39" "mobile" 1544
以下代码使用spark2.0
import org.apache.spark.sql.types._
var wikiPageViewsBySecondsSchema = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
var wikiPageViewsBySecondsDF = spark.read.schema(wikiPageViewsBySecondsSchema).option("header", "true").option("delimiter", "\t").csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
将字符串时间戳转换为时间戳
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.withColumn("timestampTS", $"timestamp".cast("timestamp")).drop("timestamp")
or
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.select($"timestamp".cast("timestamp"), $"site", $"requests")
写入 parquet 文件。
wikiPageViewsBySecondsTableDF.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
老话题,但我认为如果回答不正确,即使是老话题也很重要。
在 spark 版本 >=2 之前已经包含 csv 包,您需要将 databricks csv 包导入到您的工作中,例如“--包 com.databricks:spark-csv_2.10:1.5.0”.
csv 示例:
id,name,date
1,pete,2017-10-01 16:12
2,paul,2016-10-01 12:23
3,steve,2016-10-01 03:32
4,mary,2018-10-01 11:12
5,ann,2018-10-02 22:12
6,rudy,2018-10-03 11:11
7,mike,2018-10-04 10:10
首先需要创建hivetable,这样spark写入的数据才能与hive schema兼容。 (在未来的版本中可能不再需要)
创建table:
create table part_parq_table (
id int,
name string
)
partitioned by (date string)
stored as parquet
完成后,您可以轻松读取 csv 并将数据框保存到 table.The 第二步用 "yyyy-mm-dd" 等日期格式覆盖日期列。将为每个值创建一个文件夹,其中包含特定行。
SCALA Spark-Shell 示例:
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
前两行是创建一个尚不存在的分区文件夹所需的配置单元配置。
var df=spark.read.format("csv").option("header","true").load("/tmp/test.csv")
df=df.withColumn("date",substring(col("date"),0,10))
df.show(false)
df.write.format("parquet").mode("append").insertInto("part_parq_table")
插入完成后可以像"select * from part_parq_table"一样直接查询table。 这些文件夹将在默认 cloudera 的 table 文件夹中创建,例如hdfs:///users/hive/warehouse/part_parq_table
希望对您有所帮助 BR