如何在 HDFS 中合并部分文件?
How to merge part files in HDFS?
我想要的
我在这种目录中有 17TB 的日期分区数据:
/data_folder
/date=2021.01.01
/part-00002-f0b91523-6e0c-4adc-88cc-e9451614791d.c000.snappy.parquet
/part-00002-f0193442-c20e-49d2-bde1-70053ae2a254.c000.snappy.parquet
/... over 9000 part files
/date=2021.01.02
/part-00002-bdb50c33-fd32-4e87-9edb-cec77973760b.c000.snappy.parquet
/part-00001-e2cd906e-5669-46d7-92e9-7498ed60487f.c000.snappy.parquet
/... over 9000 part files
我想让它看起来像这样:
/data_folder
/date=2021.01.01
/merge.parquet
/date=2021.01.02
/merge.parquet
我想要这个是因为我听说 HDFS 更适合存储少量大文件,而不是大量小文件。现在我的查询变得很慢。希望这个优化能加快他们的速度
我做什么
所以我运行这样的命令:
hdfs dfs -getmerge /data_folder/date=2021.01.01 merge.parquet;
hdfs dfs -copyFromLocal -f -t 4 merge.parquet /merged/date=2021.01.01/merge.parquet;
我得到了我想要的目录结构,但现在我无法读取文件。查询:
%spark2.spark
val date = "2021.01.01"
val ofdCheques2Uniq = spark.read
.parquet(s"/projects/khajiit/data/OfdCheques2/date=$date")
.withColumn("chequeId", concat($"content.cashboxRegNumber", lit("_"), $"content.number", lit("_"), col("content.timestamp")))
.dropDuplicates("chequeId")
val ofdChequesTempUniq = spark.read
.parquet(s"/projects/khajiit/data/OfdChequesTemp/date=$date")
.withColumn("chequeId", concat($"content.cashboxRegNumber", lit("_"), $"content.number", lit("_"), col("content.timestamp")))
.dropDuplicates("chequeId")
println(s"OfdCheques2 : ${ofdCheques2Uniq.count} unique cheques")
println(s"OfdChequesTemp: ${ofdChequesTempUniq.count} unique cheques")
打印:
OfdCheques2 : 4309 unique cheques
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 74.0 failed 4 times, most recent failure: Lost task 0.3 in stage 74.0 (TID 1720, srs-st-hdp-s3.dev.kontur.ru, executor 1): java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 13
同时,这样的查询:
val ofdCheques2Uniq = spark.read
.parquet(s"/projects/khajiit/data/OfdCheques2/date=$date")
val ofdChequesTempUniq = spark.read
.parquet(s"/projects/khajiit/data/OfdChequesTemp/date=$date")
println(s"OfdCheques2 : ${ofdCheques2Uniq.count} unique cheques")
println(s"OfdChequesTemp: ${ofdChequesTempUniq.count} unique cheques")
打印:
OfdCheques2 : 5290 unique cheques
OfdChequesTemp: 18 unique cheques
终于有问题了
getmerge
命令是否适用于我的问题?如果是这样,我做错了什么?
- 解决这个问题的最佳方法是什么?
got the directory structure I wanted, but now I can't read the files
这是由于 Parquet 文件的二进制结构。他们有 header/footer 元数据,用于存储模式和文件中的记录数... getmerge
因此实际上仅对行分隔的非二进制数据格式有用。
您可以做的是 spark.read.path("/data_folder")
,然后 repartition
或 coalesce
该数据帧,然后输出到新的“合并”输出位置
另一种选择是地精 - https://gobblin.apache.org/docs/user-guide/Compaction/
我想要的
我在这种目录中有 17TB 的日期分区数据:
/data_folder
/date=2021.01.01
/part-00002-f0b91523-6e0c-4adc-88cc-e9451614791d.c000.snappy.parquet
/part-00002-f0193442-c20e-49d2-bde1-70053ae2a254.c000.snappy.parquet
/... over 9000 part files
/date=2021.01.02
/part-00002-bdb50c33-fd32-4e87-9edb-cec77973760b.c000.snappy.parquet
/part-00001-e2cd906e-5669-46d7-92e9-7498ed60487f.c000.snappy.parquet
/... over 9000 part files
我想让它看起来像这样:
/data_folder
/date=2021.01.01
/merge.parquet
/date=2021.01.02
/merge.parquet
我想要这个是因为我听说 HDFS 更适合存储少量大文件,而不是大量小文件。现在我的查询变得很慢。希望这个优化能加快他们的速度
我做什么
所以我运行这样的命令:
hdfs dfs -getmerge /data_folder/date=2021.01.01 merge.parquet;
hdfs dfs -copyFromLocal -f -t 4 merge.parquet /merged/date=2021.01.01/merge.parquet;
我得到了我想要的目录结构,但现在我无法读取文件。查询:
%spark2.spark
val date = "2021.01.01"
val ofdCheques2Uniq = spark.read
.parquet(s"/projects/khajiit/data/OfdCheques2/date=$date")
.withColumn("chequeId", concat($"content.cashboxRegNumber", lit("_"), $"content.number", lit("_"), col("content.timestamp")))
.dropDuplicates("chequeId")
val ofdChequesTempUniq = spark.read
.parquet(s"/projects/khajiit/data/OfdChequesTemp/date=$date")
.withColumn("chequeId", concat($"content.cashboxRegNumber", lit("_"), $"content.number", lit("_"), col("content.timestamp")))
.dropDuplicates("chequeId")
println(s"OfdCheques2 : ${ofdCheques2Uniq.count} unique cheques")
println(s"OfdChequesTemp: ${ofdChequesTempUniq.count} unique cheques")
打印:
OfdCheques2 : 4309 unique cheques
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 74.0 failed 4 times, most recent failure: Lost task 0.3 in stage 74.0 (TID 1720, srs-st-hdp-s3.dev.kontur.ru, executor 1): java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 13
同时,这样的查询:
val ofdCheques2Uniq = spark.read
.parquet(s"/projects/khajiit/data/OfdCheques2/date=$date")
val ofdChequesTempUniq = spark.read
.parquet(s"/projects/khajiit/data/OfdChequesTemp/date=$date")
println(s"OfdCheques2 : ${ofdCheques2Uniq.count} unique cheques")
println(s"OfdChequesTemp: ${ofdChequesTempUniq.count} unique cheques")
打印:
OfdCheques2 : 5290 unique cheques
OfdChequesTemp: 18 unique cheques
终于有问题了
getmerge
命令是否适用于我的问题?如果是这样,我做错了什么?- 解决这个问题的最佳方法是什么?
got the directory structure I wanted, but now I can't read the files
这是由于 Parquet 文件的二进制结构。他们有 header/footer 元数据,用于存储模式和文件中的记录数... getmerge
因此实际上仅对行分隔的非二进制数据格式有用。
您可以做的是 spark.read.path("/data_folder")
,然后 repartition
或 coalesce
该数据帧,然后输出到新的“合并”输出位置
另一种选择是地精 - https://gobblin.apache.org/docs/user-guide/Compaction/