将 Parquet 文件从 Spark RDD 写入动态文件夹
Write Parquet files from Spark RDD to dynamic folders
给出以下代码片段(Spark 版本:1.5.2):
rdd.toDF().write.mode(SaveMode.Append).parquet(pathToStorage)
将 RDD 数据保存到扁平化的 Parquet 文件中,我希望我的存储结构如下:
country/
year/
yearmonth/
yearmonthday/
数据本身包含一个国家列和一个时间戳列,所以我从 开始。但是,由于我的数据中只有一个时间戳,所以我无法按 year/yearmonth/yearmonthday 对整个数据进行分区,因为它们本身不是列...
而且 this solution 看起来还不错,只是我无法将其调整为 Parquet 文件...
有什么想法吗?
我明白了。为了将路径动态链接到 RDD,首先必须从 rdd 创建一个元组:
rdd.map(model => (model.country, model))
然后,所有记录都必须被解析,以检索不同的国家:
val countries = rdd.map {
case (country, model) => country
}
.distinct()
.collect()
既然知道了国家,就可以根据不同的国家来写记录了:
countries.map {
country => {
val countryRDD = rdd.filter {
case (c, model) => c == country
}
.map(_._2)
countryRDD.toDF().write.parquet(pathToStorage + "/" + country)
}
}
当然,整个集合要解析两次,但这是我目前找到的唯一解决方案。
关于时间戳,您只需对三元组执行相同的过程(第三个类似于 20160214
);我最终选择了当前时间戳。
给出以下代码片段(Spark 版本:1.5.2):
rdd.toDF().write.mode(SaveMode.Append).parquet(pathToStorage)
将 RDD 数据保存到扁平化的 Parquet 文件中,我希望我的存储结构如下:
country/
year/
yearmonth/
yearmonthday/
数据本身包含一个国家列和一个时间戳列,所以我从
而且 this solution 看起来还不错,只是我无法将其调整为 Parquet 文件...
有什么想法吗?
我明白了。为了将路径动态链接到 RDD,首先必须从 rdd 创建一个元组:
rdd.map(model => (model.country, model))
然后,所有记录都必须被解析,以检索不同的国家:
val countries = rdd.map {
case (country, model) => country
}
.distinct()
.collect()
既然知道了国家,就可以根据不同的国家来写记录了:
countries.map {
country => {
val countryRDD = rdd.filter {
case (c, model) => c == country
}
.map(_._2)
countryRDD.toDF().write.parquet(pathToStorage + "/" + country)
}
}
当然,整个集合要解析两次,但这是我目前找到的唯一解决方案。
关于时间戳,您只需对三元组执行相同的过程(第三个类似于 20160214
);我最终选择了当前时间戳。