使用 Spark Job 从 shared/NFS 安装位置将 file/data 放入 HDFS,出现间歇性问题

Using Spark Job for putting file/data into HDFS from shared/NFS mounted location giving intermittent issues

我有一个使用 Yarn 作为集群管理器的三节点 Spark 集群 [运行 在三节点 hadoop 集群上。

考虑一下,我的 Hadoop 集群有三个节点 [Master、Slave1 和 Slave2] Resourcemanager 在 Master 上是 运行,NodaManager 在 Slave1 和 Slave2 上。 Spark Cluster 也存在于三个节点上。

在主节点上,我创建了一个文件夹 /data/nfsshare,我将其作为 /nfsshare 安装在 Slave1 和 Slave2 上。现在我在 /data/nfsshare 文件夹中保存了一个文件 abc.txt,在 /nfsshare 位置对 slave1 和 slave2 都可见。

我创建了一个小型 spark 作业,用于将 abc.txt 从 /data/nfsshare 位置复制到 HDFS,还执行字数统计并将其结果保存在 HDFS 中。

def write2HDFS(args:Array[String]){

val  source = args(0)
val  destination = args(1)   
val processedDataDestination = args(2)
val conf = new SparkConf().setAppName("WCRemoteReadHDFSWrite").set("spark.hadoop.validateOutputSpecs", "true");
val sc = new SparkContext(conf)

logger.info("STARTING READ")

val rdd = sc.textFile(source)

logger.info("READING DONE")
logger.info("STARTING WRITE")
logger.info("rdd.toDebugString >>>>>> "+rdd.toDebugString)
logger.info("rdd.getNumPartitions >>>>>>>>" +rdd.getNumPartitions)
// rdd.coalesce(1)
// logger.info("rdd.getNumPartitions after coalesce(1) >>>>>>>>" +rdd.getNumPartitions)

rdd.saveAsTextFile(destination)


logger.info("DONE")
rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).saveAsTextFile(processedDataDestination) 

sc.stop}

当我尝试使用命令执行此代码时:

./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar file:///data/nfsshare/abc.txt hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWrite1MB hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWriteP1MB

我遇到以下间歇性问题:

1) InputPath 不存在:文件:/data/nfsshare/abc.txt,在此作业的某些运行期间间歇性出现 [而文件存在于共享 location/mounted 路径]

2) Sometimes/intermittently 作业状态为失败,但正在使用所需数据创建输出目录

3) 输出目录已经存在:有时 HDFS 输出目录存在问题即将到来 --> 这通过增加执行程序和驱动程序内存得到解决

-->我在集群和客户端部署模式下都尝试了 运行 这个作业,但在这两种情况下我都遇到了同样的问题。

我不确定 Master 的共享位置路径 /data/nfsshare 和 slave 的 /nfsshare 是否有任何区别?因为在命令行我将 /data/nfsshare 作为文件路径位置传递,因此每当从属上的任何执行程序 运行 寻找 /data/nfsshare 都会失败。

我在所有三个节点上都尝试了 运行 这个作业,但这些间歇性问题仍然存在。

如有任何专家建议,我们将不胜感激。

如果有任何其他更好的方法可以将文件从任何暂存 area/mounted 位置放入 HDFS,那么也请分享它。

此致, 布佩什

  1. 最好将您的输入文件上传到没有 spark 的 hdfs,只需使用 hdfs dfs -copyFromLocal 将其上传到 hdfs 或者您可以尝试使用 hdfs 客户端库上传它,但单线程没有 spark api .通常假设输入数据已经在分布式文件系统(s3、hdfs 等)中。使用 nfs 时,您可能会看到各种效果。所以从设计的角度来看,管道的某些部分将数据放入 s3/hdfs,然后才开始进行 spark 的并行处理。
  2. 是的,如果你 运行 一遍又一遍地做同样的工作,你应该清理你的输出目录,我认为有 spark 配置允许你禁用这个验证,但是更好地设计你的应用程序来编写每次都进入新路径

Q1, Q2) 你提到你 nfs 将目录从本地 /data/nfsshare 挂载到 HDFS 上的 /nfsshare。如果您已成功完成此操作并且已验证它正在运行,为什么不将其用作您的输入路径?

当您尝试使用本地文件系统时,在 YARN 模式下事情会变得有些棘手。如果您使用的是分布式计算,最好将您的输入保存在 HDFS 中。所以,你的 spark-submit 命令变成,

./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar /nfsshare/path /user/hduser/hdfsWrite1MB /user/hduser/hdfsWriteP1MB

注意我省略了hdfs://,这是因为Spark环境默认的文件系统是HDFS

Q3) 输出目录已存在:您可以在保存文件之前执行此操作,如 here

所述
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://host:port/"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(/path/to/output), true) } 
catch { case _ : Throwable => { } }

或者如果您不想一次又一次地删除内容,您可以简单地将当前时间戳附加到您的输出路径。只有在处理 RDD 时,Dataframe API 才能覆盖现有路径。

PS: 您的 Q1 显示输入路径为 file:/data/nfsshare/test-1MB 而在 spark-submit 命令中输入显示 file:///data/nfsshare/abc.txtabc.txt 是目录吗?

如果有帮助请告诉我。干杯。

实际上,由于安装的文件夹名称,我间歇性地面临 InputPath Doesnt exist: file:/data/nfsshare/abc.txt。一旦我在所有节点上保持相同的名称 [/data/nfsshare]。此问题已解决。

我假设当我 运行 在集群模式下运行我的 spark 作业时,YARN 正在决定 运行 驱动程序和执行程序的位置,因此如果所有执行程序都是 运行主节点上的 ning [从 /data/nfsshare] 可见,作业工作正常,而对于此路径作为 /nfsshare 的其他执行程序,此抛出路径相关问题。一旦路径问题得到解决,所有执行者都能够看到文件路径为 /data/nfsshare

此外,对于已经存在的输出目录,Chitral Verma 的代码片段有所帮助。