在 Spark 中保存镶木地板文件时出错

Saving parquet file in Spark giving error

我们在 CDH 5.5 集群上使用 spark 1.6.1。这项工作在 Kerberos 上运行良好,但是当我们实施静态加密时,我们 运行 遇到了以下问题:-

Df.write().mode(SaveMode.Append).partitionBy("Partition").parquet(path);

我已经尝试设置这些值但没有成功:-

 sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "true"/"false");   
 sparkContext.hadoopConfiguration().setInt("parquet.metadata.read.parallelism", 1);

 SparkConf.set("spark.sql.parquet.mergeSchema","false");
 SparkConf.set("spark.sql.parquet.filterPushdown","true");

理想情况下,我想将 summary-metadata 设置为 false,因为它会在写入期间保存。

17/01/30 18:37:54 WARN hadoop.ParquetOutputCommitter: could not write summary file for hdfs://abc
java.io.IOException: Could not read footer: java.io.IOException: Could not read footer for file FileStatus{path=hdfs://abc/Partition=O/part-r-00003-95adb09f-627f-42fe-9b89-7631226e998f.gz.parquet; isDirectory=false; length=12775; replication=3; blocksize=134217728; modification_time=1485801467817; access_time=1485801467179; owner=bigdata-service; group=bigdata; permission=rw-rw----; isSymlink=false}
at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247)
at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:262)
at org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:56)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run.apply(InsertIntoHadoopFsRelation.scala:106)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run.apply(InsertIntoHadoopFsRelation.scala:106)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at thomsonreuters.northstar.main.ParquetFileWriter.writeDataToParquet(ParquetFileWriter.java:173)
at thomsonreuters.northstar.main.SparkProcessor.process(SparkProcessor.java:128)
at thomsonreuters.northstar.main.NorthStarMain.main(NorthStarMain.java:129)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:558)
Caused by: java.io.IOException: Could not read footer for file FileStatus{path=hdfs://abc/Partition=O/part-r-00003-95adb09f-627f-42fe-9b89-7631226e998f.gz.parquet; isDirectory=false; length=12775; replication=3; blocksize=134217728; modification_time=1485801467817; access_time=1485801467179; owner=bigdata-app-ooxp-service; group=bigdata; permission=rw-rw----; isSymlink=false}
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:239)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: can not read class org.apache.parquet.format.FileMetaData: Required field 'version' was not found in serialized data! Struct: FileMetaData(version:0, schema:null, num_rows:0, row_groups:null)
at org.apache.parquet.format.Util.read(Util.java:216)
at org.apache.parquet.format.Util.readFileMetaData(Util.java:73)
at org.apache.parquet.format.converter.ParquetMetadataConverter.visit(ParquetMetadataConverter.java:515)
at org.apache.parquet.format.converter.ParquetMetadataConverter.visit(ParquetMetadataConverter.java:512)
at org.apache.parquet.format.converter.ParquetMetadataConverter$NoFilter.accept(ParquetMetadataConverter.java:433)
at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:512)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:430)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:237)
... 5 more
Caused by: parquet.org.apache.thrift.protocol.TProtocolException: Required field 'version' was not found in serialized data! Struct: FileMetaData(version:0, schema:null, num_rows:0, row_groups:null)
at org.apache.parquet.format.FileMetaData.read(FileMetaData.java:881)
at org.apache.parquet.format.Util.read(Util.java:213)
... 12 more
17/01/30 18:37:54 WARN hdfs.BlockReaderFactory: I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:854)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:176)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:649)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:237)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/30 18:37:54 WARN hdfs.DFSClient: Failed to connect to /10.51.29.22:1004 for block, add to deadNodes and continue. java.nio.channels.ClosedByInterruptException
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:854)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:176)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:649)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:237)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/30 18:37:54 WARN hdfs.BlockReaderFactory: I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:854)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:176)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:649)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:237)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/30 18:37:54 WARN hdfs.DFSClient: Failed to connect to /10.51.29.217:1004 for block, add to deadNodes and continue. java.nio.channels.ClosedByInterruptException
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:854)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:176)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:649)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:237)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/30 18:37:54 WARN hdfs.BlockReaderFactory: I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:854)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:176)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:649)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:237)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/30 18:37:54 WARN hdfs.DFSClient: Failed to connect to /10.51.29.218:1004 for block, add to deadNodes and continue. java.nio.channels.ClosedByInterruptException
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:854)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:176)
at org.apache.hadoop.crypto.CryptoInputStream.read(CryptoInputStream.java:649)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:237)
at org.apache.parquet.hadoop.ParquetFileReader.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

看来是开源Spark 1.6.1和Cloudera的Spark之间的库冲突问题。我更改了我的 POM 文件以使用

Spark version :- 1.6.0-cdh5.9.1

现在一切正常。

P.S :- 如果您 运行 遇到以下错误,您可能会将 "spark.shuffle.encryption.enabled" 设置为 true。

Caused by: java.lang.NullPointerException
at com.intel.chimera.stream.CryptoOutputStream.<init>(CryptoOutputStream.java:124)
at com.intel.chimera.stream.CryptoOutputStream.<init>(CryptoOutputStream.java:113)
at com.intel.chimera.stream.CryptoOutputStream.<init>(CryptoOutputStream.java:102)
at com.intel.chimera.stream.CryptoOutputStream.<init>(CryptoOutputStream.java:89)
at org.apache.spark.crypto.CryptoStreamUtils$.createCryptoOutputStream(CryptoStreamUtils.scala:51)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:104)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)