在 Spark 中使用 FileUtil.copy 将文件从 HDFS 上传到 S3 会导致 DiskErrorException: Directory is not writable 错误?
Uploading File from HDFS to S3 using FileUtil.copy in Spark causing DiskErrorException: Directory is not writable error?
我正在尝试将 parquet 文件写入 HDFS,然后将其复制到 s3。
我用 Zeppelin 编写了代码并且运行良好。
没有任何问题,它将文件添加到 s3 文件路径。
var outputFolder = "buckent_name/path"
println("\n ---- TASK 1 ----- \n writing with path " + outputFolder)
wholeParquetFile
.withColumn("date_col", to_date(col("timestamp"), "YYYYMMdd"))
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.drop("date_col")
.repartition(1)
.write.mode(SaveMode.Overwrite)
.partitionBy("year", "month", "day")
.parquet(outputFolder)
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val allTheFilesThatBennCreated: Array[FileStatus] = fs.globStatus(new Path(outputFolder + "/year=*/month=*/day=*/*"))
println("------- allTheFilesThatBennCreated -------" + allTheFilesThatBennCreated.mkString("Array(", ", ", ")"))
// right now the file path will be outputFile + "/year=2021/month=5/day=17/part-....c000.snappy.parquet
// converting it to outputFile + "/2021/5/17/part-....c000.snappy.parquet"
allTheFilesThatBennCreated.foreach(path => {
val newPathString = generateOutputFilePathString(path.getPath.toString)
val outputFilePath = new Path(newPathString)
val destinationFileSystem = FileSystem.get(outputFilePath.toUri, sc.hadoopConfiguration)
val sourceFileSystem = FileSystem.get(path.getPath.toUri, sc.hadoopConfiguration)
println("-------- source filesystem ------------------" + sourceFileSystem)
println("-------- path.getPath --------------" + path.getPath)
println("-------- destinationFileSystem ------------- " + destinationFileSystem)
println("-------- S3 path for Output File ------------" + outputFilePath)
// uploading to s3 from hdfs
FileUtil.copy(sourceFileSystem, path.getPath, destinationFileSystem, outputFilePath,true, sc.hadoopConfiguration)
})
但是当我尝试运行 spark-shell 中的相同代码或通过 spark-submit 中的 jar 文件时出现此错误。
22/05/17 09:57:28 WARN LocalDirAllocator$AllocatorPerContext: /mnt/var/lib/hadoop/tmp/s3a is not writable
org.apache.hadoop.util.DiskChecker$DiskErrorException: Directory is not writable: /mnt/var/lib/hadoop/tmp/s3a
at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:167)
at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:315)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:86)
at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:70)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain(ApplicationMain.scala:70)
at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories.
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:86)
at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:70)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain(ApplicationMain.scala:70)
at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
有人知道如何解决这个问题吗?
你没有说任何关于你的 Zeplin 和 CLI 环境的事情,无论它们是提交到同一个集群还是你的 CLI 使用本地模式
尽管如此,线索在您的堆栈跟踪中
No space available in any of the local directories.
进一步调查,错误在 FileUtil.copy()
它试图将临时输出写入由 属性 mapred.local.dir
配置的路径,这是您可以检查的
我正在尝试将 parquet 文件写入 HDFS,然后将其复制到 s3。
我用 Zeppelin 编写了代码并且运行良好。 没有任何问题,它将文件添加到 s3 文件路径。
var outputFolder = "buckent_name/path"
println("\n ---- TASK 1 ----- \n writing with path " + outputFolder)
wholeParquetFile
.withColumn("date_col", to_date(col("timestamp"), "YYYYMMdd"))
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.drop("date_col")
.repartition(1)
.write.mode(SaveMode.Overwrite)
.partitionBy("year", "month", "day")
.parquet(outputFolder)
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val allTheFilesThatBennCreated: Array[FileStatus] = fs.globStatus(new Path(outputFolder + "/year=*/month=*/day=*/*"))
println("------- allTheFilesThatBennCreated -------" + allTheFilesThatBennCreated.mkString("Array(", ", ", ")"))
// right now the file path will be outputFile + "/year=2021/month=5/day=17/part-....c000.snappy.parquet
// converting it to outputFile + "/2021/5/17/part-....c000.snappy.parquet"
allTheFilesThatBennCreated.foreach(path => {
val newPathString = generateOutputFilePathString(path.getPath.toString)
val outputFilePath = new Path(newPathString)
val destinationFileSystem = FileSystem.get(outputFilePath.toUri, sc.hadoopConfiguration)
val sourceFileSystem = FileSystem.get(path.getPath.toUri, sc.hadoopConfiguration)
println("-------- source filesystem ------------------" + sourceFileSystem)
println("-------- path.getPath --------------" + path.getPath)
println("-------- destinationFileSystem ------------- " + destinationFileSystem)
println("-------- S3 path for Output File ------------" + outputFilePath)
// uploading to s3 from hdfs
FileUtil.copy(sourceFileSystem, path.getPath, destinationFileSystem, outputFilePath,true, sc.hadoopConfiguration)
})
但是当我尝试运行 spark-shell 中的相同代码或通过 spark-submit 中的 jar 文件时出现此错误。
22/05/17 09:57:28 WARN LocalDirAllocator$AllocatorPerContext: /mnt/var/lib/hadoop/tmp/s3a is not writable
org.apache.hadoop.util.DiskChecker$DiskErrorException: Directory is not writable: /mnt/var/lib/hadoop/tmp/s3a
at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:167)
at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:315)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:86)
at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:70)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain(ApplicationMain.scala:70)
at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories.
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:86)
at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:70)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain(ApplicationMain.scala:70)
at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
有人知道如何解决这个问题吗?
你没有说任何关于你的 Zeplin 和 CLI 环境的事情,无论它们是提交到同一个集群还是你的 CLI 使用本地模式
尽管如此,线索在您的堆栈跟踪中
No space available in any of the local directories.
进一步调查,错误在 FileUtil.copy()
它试图将临时输出写入由 属性 mapred.local.dir
配置的路径,这是您可以检查的