使用 Spark 将 Parquet 写入 HDFS 的速度较慢
Slow Parquet write to HDFS using Spark
我正在使用 Spark 1.6.1 并写入 HDFS。在某些情况下,似乎所有工作都由一个线程完成。这是为什么?
此外,我需要 parquet.enable.summary-metadata 来将 parquet 文件注册到 Impala。
Df.write().partitionBy("COLUMN").parquet(outputFileLocation);
而且,似乎所有这一切都发生在一个 cpu 执行者身上。
16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_201611031459_0154_m_000029_0
16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 41.9 GB to disk (3 times so far)
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
再一次:-
16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Maximum partitions reached, falling back on sorting.
16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (0 time so far)
16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (1 time so far)
16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: Sorting complete. Writing out partition files one at a time.
16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
模式
大概200行以下反复20次左右。
16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size 135,903,551 > 134,217,728: flushing 1,040,100 records to disk.
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 89,688,651
以下约 200 行
16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 413,231B for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values, 1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp}
然后最后:-
16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_201611031521_0154_m_000040_0' to hdfs://PATH/_temporary/0/task_201611031521_0154_m_000040
16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil: attempt_201611031521_0154_m_000040_0: Committed
16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 154.0 (TID 8545). 3757 bytes result sent to driver
更新:
parquet.enable.summary-元数据设置为 false。
将分区减少到 21 个。
Df.write().mode(SaveMode.Append).partitionBy("COL").parquet(outputFileLocation);
它确实提高了速度,但仍需要一个小时才能完成。
更新:-
大多数问题的原因是多个左外连接,非常小的数据在写入之前被具体化。由于追加模式使文件保持打开状态,因此发生了溢出。在此模式下默认限制为 5 个打开的文件。您可以使用 属性 "spark.sql.sources.maxConcurrentWrites"
来增加它
最后,在到达写入部分之前对代码进行了一些优化之后,我们获得了更好的写入时间。之前我们无法进行重新分区,因为洗牌超过 4-5 Gb。在之前的更改之后,我将代码从 coalesce 更改为 repartition,它通过为执行程序中的每个 CPU 提供大约相同数量的数据来将数据分布到那里的所有执行程序。 因此,如果您发现作业创建的 parquet 文件大小不同,请尝试在写入前对 Dataframe 进行重新分区。
此外,这也有助于提高写入性能 :-
sc.hadoopConfiguration.set("parquet.enable.dictionary", "false")
我正在使用 Spark 1.6.1 并写入 HDFS。在某些情况下,似乎所有工作都由一个线程完成。这是为什么?
此外,我需要 parquet.enable.summary-metadata 来将 parquet 文件注册到 Impala。
Df.write().partitionBy("COLUMN").parquet(outputFileLocation);
而且,似乎所有这一切都发生在一个 cpu 执行者身上。
16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_201611031459_0154_m_000029_0
16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 41.9 GB to disk (3 times so far)
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
再一次:-
16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Maximum partitions reached, falling back on sorting.
16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (0 time so far)
16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (1 time so far)
16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: Sorting complete. Writing out partition files one at a time.
16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
模式
大概200行以下反复20次左右。
16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size 135,903,551 > 134,217,728: flushing 1,040,100 records to disk.
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 89,688,651
以下约 200 行
16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 413,231B for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values, 1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp}
然后最后:-
16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_201611031521_0154_m_000040_0' to hdfs://PATH/_temporary/0/task_201611031521_0154_m_000040
16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil: attempt_201611031521_0154_m_000040_0: Committed
16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 154.0 (TID 8545). 3757 bytes result sent to driver
更新:
parquet.enable.summary-元数据设置为 false。
将分区减少到 21 个。
Df.write().mode(SaveMode.Append).partitionBy("COL").parquet(outputFileLocation);
它确实提高了速度,但仍需要一个小时才能完成。
更新:- 大多数问题的原因是多个左外连接,非常小的数据在写入之前被具体化。由于追加模式使文件保持打开状态,因此发生了溢出。在此模式下默认限制为 5 个打开的文件。您可以使用 属性 "spark.sql.sources.maxConcurrentWrites"
来增加它最后,在到达写入部分之前对代码进行了一些优化之后,我们获得了更好的写入时间。之前我们无法进行重新分区,因为洗牌超过 4-5 Gb。在之前的更改之后,我将代码从 coalesce 更改为 repartition,它通过为执行程序中的每个 CPU 提供大约相同数量的数据来将数据分布到那里的所有执行程序。 因此,如果您发现作业创建的 parquet 文件大小不同,请尝试在写入前对 Dataframe 进行重新分区。
此外,这也有助于提高写入性能 :-
sc.hadoopConfiguration.set("parquet.enable.dictionary", "false")