在 Spark 中使用 FileUtil.copy 将文件从 HDFS 上传到 S3 会导致 DiskErrorException: Directory is not writable 错误?

Uploading File from HDFS to S3 using FileUtil.copy in Spark causing DiskErrorException: Directory is not writable error?

我正在尝试将 parquet 文件写入 HDFS,然后将其复制到 s3。

我用 Zeppelin 编写了代码并且运行良好。 没有任何问题,它将文件添加到 s3 文件路径。

  var outputFolder = "buckent_name/path"
  println("\n ---- TASK 1 ----- \n writing with path " + outputFolder)
  wholeParquetFile
    .withColumn("date_col", to_date(col("timestamp"), "YYYYMMdd"))
    .withColumn("year", year(col("date_col")))
    .withColumn("month", month(col("date_col")))
    .withColumn("day", dayofmonth(col("date_col")))
    .drop("date_col")
    .repartition(1)
    .write.mode(SaveMode.Overwrite)
    .partitionBy("year", "month", "day")
    .parquet(outputFolder)


  val sc = spark.sparkContext
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val allTheFilesThatBennCreated: Array[FileStatus] = fs.globStatus(new Path(outputFolder + "/year=*/month=*/day=*/*"))

  println("------- allTheFilesThatBennCreated   -------" + allTheFilesThatBennCreated.mkString("Array(", ", ", ")"))

  // right now the file path will be outputFile + "/year=2021/month=5/day=17/part-....c000.snappy.parquet
  // converting it to                outputFile + "/2021/5/17/part-....c000.snappy.parquet"
  allTheFilesThatBennCreated.foreach(path => {

    val newPathString = generateOutputFilePathString(path.getPath.toString)
    val outputFilePath = new Path(newPathString)
    val destinationFileSystem = FileSystem.get(outputFilePath.toUri, sc.hadoopConfiguration)
    val sourceFileSystem = FileSystem.get(path.getPath.toUri, sc.hadoopConfiguration)

    println("-------- source filesystem ------------------" + sourceFileSystem)

    println("-------- path.getPath --------------" + path.getPath)

    println("-------- destinationFileSystem ------------- " + destinationFileSystem)

    println("-------- S3 path for Output File ------------" + outputFilePath)

    // uploading to s3 from hdfs
    FileUtil.copy(sourceFileSystem, path.getPath, destinationFileSystem, outputFilePath,true, sc.hadoopConfiguration)
  })

但是当我尝试运行 spark-shell 中的相同代码或通过 spark-submit 中的 jar 文件时出现此错误。

22/05/17 09:57:28 WARN LocalDirAllocator$AllocatorPerContext: /mnt/var/lib/hadoop/tmp/s3a is not writable

org.apache.hadoop.util.DiskChecker$DiskErrorException: Directory is not writable: /mnt/var/lib/hadoop/tmp/s3a
    at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:167)
    at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
    at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:315)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
    at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
    at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
    at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:86)
    at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:70)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain(ApplicationMain.scala:70)
    at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main.apply(App.scala:76)
    at scala.App$$anonfun$main.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
    at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
    at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:938)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories.
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
    at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
    at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
    at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:86)
    at com.propellyr.driver.ApplicationMain$$anonfun.apply(ApplicationMain.scala:70)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain(ApplicationMain.scala:70)
    at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main.apply(App.scala:76)
    at scala.App$$anonfun$main.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
    at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
    at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:938)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

有人知道如何解决这个问题吗?

你没有说任何关于你的 Zeplin 和 CLI 环境的事情,无论它们是提交到同一个集群还是你的 CLI 使用本地模式

尽管如此,线索在您的堆栈跟踪中

No space available in any of the local directories.

进一步调查,错误在 FileUtil.copy() 它试图将临时输出写入由 属性 mapred.local.dir 配置的路径,这是您可以检查的