使用 Spark Job 从 shared/NFS 安装位置将 file/data 放入 HDFS,出现间歇性问题
Using Spark Job for putting file/data into HDFS from shared/NFS mounted location giving intermittent issues
我有一个使用 Yarn 作为集群管理器的三节点 Spark 集群 [运行 在三节点 hadoop 集群上。
考虑一下,我的 Hadoop 集群有三个节点 [Master、Slave1 和 Slave2] Resourcemanager 在 Master 上是 运行,NodaManager 在 Slave1 和 Slave2 上。 Spark Cluster 也存在于三个节点上。
在主节点上,我创建了一个文件夹 /data/nfsshare,我将其作为 /nfsshare 安装在 Slave1 和 Slave2 上。现在我在 /data/nfsshare 文件夹中保存了一个文件 abc.txt,在 /nfsshare 位置对 slave1 和 slave2 都可见。
我创建了一个小型 spark 作业,用于将 abc.txt 从 /data/nfsshare 位置复制到 HDFS,还执行字数统计并将其结果保存在 HDFS 中。
def write2HDFS(args:Array[String]){
val source = args(0)
val destination = args(1)
val processedDataDestination = args(2)
val conf = new SparkConf().setAppName("WCRemoteReadHDFSWrite").set("spark.hadoop.validateOutputSpecs", "true");
val sc = new SparkContext(conf)
logger.info("STARTING READ")
val rdd = sc.textFile(source)
logger.info("READING DONE")
logger.info("STARTING WRITE")
logger.info("rdd.toDebugString >>>>>> "+rdd.toDebugString)
logger.info("rdd.getNumPartitions >>>>>>>>" +rdd.getNumPartitions)
// rdd.coalesce(1)
// logger.info("rdd.getNumPartitions after coalesce(1) >>>>>>>>" +rdd.getNumPartitions)
rdd.saveAsTextFile(destination)
logger.info("DONE")
rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).saveAsTextFile(processedDataDestination)
sc.stop}
当我尝试使用命令执行此代码时:
./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar file:///data/nfsshare/abc.txt hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWrite1MB hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWriteP1MB
我遇到以下间歇性问题:
1) InputPath 不存在:文件:/data/nfsshare/abc.txt,在此作业的某些运行期间间歇性出现 [而文件存在于共享 location/mounted 路径]
2) Sometimes/intermittently 作业状态为失败,但正在使用所需数据创建输出目录
3) 输出目录已经存在:有时 HDFS 输出目录存在问题即将到来 --> 这通过增加执行程序和驱动程序内存得到解决
-->我在集群和客户端部署模式下都尝试了 运行 这个作业,但在这两种情况下我都遇到了同样的问题。
我不确定 Master 的共享位置路径 /data/nfsshare 和 slave 的 /nfsshare 是否有任何区别?因为在命令行我将 /data/nfsshare 作为文件路径位置传递,因此每当从属上的任何执行程序 运行 寻找 /data/nfsshare 都会失败。
我在所有三个节点上都尝试了 运行 这个作业,但这些间歇性问题仍然存在。
如有任何专家建议,我们将不胜感激。
如果有任何其他更好的方法可以将文件从任何暂存 area/mounted 位置放入 HDFS,那么也请分享它。
此致,
布佩什
- 最好将您的输入文件上传到没有 spark 的 hdfs,只需使用 hdfs dfs -copyFromLocal 将其上传到 hdfs 或者您可以尝试使用 hdfs 客户端库上传它,但单线程没有 spark api .通常假设输入数据已经在分布式文件系统(s3、hdfs 等)中。使用 nfs 时,您可能会看到各种效果。所以从设计的角度来看,管道的某些部分将数据放入 s3/hdfs,然后才开始进行 spark 的并行处理。
- 是的,如果你 运行 一遍又一遍地做同样的工作,你应该清理你的输出目录,我认为有 spark 配置允许你禁用这个验证,但是更好地设计你的应用程序来编写每次都进入新路径
Q1, Q2) 你提到你 nfs 将目录从本地 /data/nfsshare 挂载到 HDFS 上的 /nfsshare。如果您已成功完成此操作并且已验证它正在运行,为什么不将其用作您的输入路径?
当您尝试使用本地文件系统时,在 YARN 模式下事情会变得有些棘手。如果您使用的是分布式计算,最好将您的输入保存在 HDFS 中。所以,你的 spark-submit 命令变成,
./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar /nfsshare/path /user/hduser/hdfsWrite1MB /user/hduser/hdfsWriteP1MB
注意我省略了hdfs://
,这是因为Spark环境默认的文件系统是HDFS
Q3) 输出目录已存在:您可以在保存文件之前执行此操作,如 here、
所述
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://host:port/"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(/path/to/output), true) }
catch { case _ : Throwable => { } }
或者如果您不想一次又一次地删除内容,您可以简单地将当前时间戳附加到您的输出路径。只有在处理 RDD 时,Dataframe API 才能覆盖现有路径。
PS: 您的 Q1 显示输入路径为 file:/data/nfsshare/test-1MB
而在 spark-submit
命令中输入显示 file:///data/nfsshare/abc.txt
。 abc.txt
是目录吗?
如果有帮助请告诉我。干杯。
实际上,由于安装的文件夹名称,我间歇性地面临 InputPath Doesnt exist: file:/data/nfsshare/abc.txt。一旦我在所有节点上保持相同的名称 [/data/nfsshare]。此问题已解决。
我假设当我 运行 在集群模式下运行我的 spark 作业时,YARN 正在决定 运行 驱动程序和执行程序的位置,因此如果所有执行程序都是 运行主节点上的 ning [从 /data/nfsshare] 可见,作业工作正常,而对于此路径作为 /nfsshare 的其他执行程序,此抛出路径相关问题。一旦路径问题得到解决,所有执行者都能够看到文件路径为 /data/nfsshare
此外,对于已经存在的输出目录,Chitral Verma 的代码片段有所帮助。
我有一个使用 Yarn 作为集群管理器的三节点 Spark 集群 [运行 在三节点 hadoop 集群上。
考虑一下,我的 Hadoop 集群有三个节点 [Master、Slave1 和 Slave2] Resourcemanager 在 Master 上是 运行,NodaManager 在 Slave1 和 Slave2 上。 Spark Cluster 也存在于三个节点上。
在主节点上,我创建了一个文件夹 /data/nfsshare,我将其作为 /nfsshare 安装在 Slave1 和 Slave2 上。现在我在 /data/nfsshare 文件夹中保存了一个文件 abc.txt,在 /nfsshare 位置对 slave1 和 slave2 都可见。
我创建了一个小型 spark 作业,用于将 abc.txt 从 /data/nfsshare 位置复制到 HDFS,还执行字数统计并将其结果保存在 HDFS 中。
def write2HDFS(args:Array[String]){
val source = args(0)
val destination = args(1)
val processedDataDestination = args(2)
val conf = new SparkConf().setAppName("WCRemoteReadHDFSWrite").set("spark.hadoop.validateOutputSpecs", "true");
val sc = new SparkContext(conf)
logger.info("STARTING READ")
val rdd = sc.textFile(source)
logger.info("READING DONE")
logger.info("STARTING WRITE")
logger.info("rdd.toDebugString >>>>>> "+rdd.toDebugString)
logger.info("rdd.getNumPartitions >>>>>>>>" +rdd.getNumPartitions)
// rdd.coalesce(1)
// logger.info("rdd.getNumPartitions after coalesce(1) >>>>>>>>" +rdd.getNumPartitions)
rdd.saveAsTextFile(destination)
logger.info("DONE")
rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).saveAsTextFile(processedDataDestination)
sc.stop}
当我尝试使用命令执行此代码时:
./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar file:///data/nfsshare/abc.txt hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWrite1MB hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWriteP1MB
我遇到以下间歇性问题:
1) InputPath 不存在:文件:/data/nfsshare/abc.txt,在此作业的某些运行期间间歇性出现 [而文件存在于共享 location/mounted 路径]
2) Sometimes/intermittently 作业状态为失败,但正在使用所需数据创建输出目录
3) 输出目录已经存在:有时 HDFS 输出目录存在问题即将到来 --> 这通过增加执行程序和驱动程序内存得到解决
-->我在集群和客户端部署模式下都尝试了 运行 这个作业,但在这两种情况下我都遇到了同样的问题。
我不确定 Master 的共享位置路径 /data/nfsshare 和 slave 的 /nfsshare 是否有任何区别?因为在命令行我将 /data/nfsshare 作为文件路径位置传递,因此每当从属上的任何执行程序 运行 寻找 /data/nfsshare 都会失败。
我在所有三个节点上都尝试了 运行 这个作业,但这些间歇性问题仍然存在。
如有任何专家建议,我们将不胜感激。
如果有任何其他更好的方法可以将文件从任何暂存 area/mounted 位置放入 HDFS,那么也请分享它。
此致, 布佩什
- 最好将您的输入文件上传到没有 spark 的 hdfs,只需使用 hdfs dfs -copyFromLocal 将其上传到 hdfs 或者您可以尝试使用 hdfs 客户端库上传它,但单线程没有 spark api .通常假设输入数据已经在分布式文件系统(s3、hdfs 等)中。使用 nfs 时,您可能会看到各种效果。所以从设计的角度来看,管道的某些部分将数据放入 s3/hdfs,然后才开始进行 spark 的并行处理。
- 是的,如果你 运行 一遍又一遍地做同样的工作,你应该清理你的输出目录,我认为有 spark 配置允许你禁用这个验证,但是更好地设计你的应用程序来编写每次都进入新路径
Q1, Q2) 你提到你 nfs 将目录从本地 /data/nfsshare 挂载到 HDFS 上的 /nfsshare。如果您已成功完成此操作并且已验证它正在运行,为什么不将其用作您的输入路径?
当您尝试使用本地文件系统时,在 YARN 模式下事情会变得有些棘手。如果您使用的是分布式计算,最好将您的输入保存在 HDFS 中。所以,你的 spark-submit 命令变成,
./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar /nfsshare/path /user/hduser/hdfsWrite1MB /user/hduser/hdfsWriteP1MB
注意我省略了hdfs://
,这是因为Spark环境默认的文件系统是HDFS
Q3) 输出目录已存在:您可以在保存文件之前执行此操作,如 here、
所述val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://host:port/"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(/path/to/output), true) }
catch { case _ : Throwable => { } }
或者如果您不想一次又一次地删除内容,您可以简单地将当前时间戳附加到您的输出路径。只有在处理 RDD 时,Dataframe API 才能覆盖现有路径。
PS: 您的 Q1 显示输入路径为 file:/data/nfsshare/test-1MB
而在 spark-submit
命令中输入显示 file:///data/nfsshare/abc.txt
。 abc.txt
是目录吗?
如果有帮助请告诉我。干杯。
实际上,由于安装的文件夹名称,我间歇性地面临 InputPath Doesnt exist: file:/data/nfsshare/abc.txt。一旦我在所有节点上保持相同的名称 [/data/nfsshare]。此问题已解决。
我假设当我 运行 在集群模式下运行我的 spark 作业时,YARN 正在决定 运行 驱动程序和执行程序的位置,因此如果所有执行程序都是 运行主节点上的 ning [从 /data/nfsshare] 可见,作业工作正常,而对于此路径作为 /nfsshare 的其他执行程序,此抛出路径相关问题。一旦路径问题得到解决,所有执行者都能够看到文件路径为 /data/nfsshare
此外,对于已经存在的输出目录,Chitral Verma 的代码片段有所帮助。