在 spark/scala 中循环处理多个 HDFS 文件

Loop and process multiple HDFS files in spark/scala

我的 HDFS 文件夹中有多个文件,我想循环并运行我的 scala 转换逻辑。

我正在使用下面的脚本,它在我使用本地文件的开发环境中工作正常,但是当我在我的 HDFS 环境中 运行 时它失败了。知道我哪里做错了吗?

val files = new File("hdfs://172.X.X.X:8020/landing/").listFiles.map(_.getName).toList

files.foreach { file =>
print(file) 
val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + file)
event.show(false)
}

有人可以更正它或提出替代解决方案吗?

您应该使用 Hadoop IO 库来处理 hadoop 文件。

代码:

import java.net.URI
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

val spark=SparkSession.builder().master("local[*]").getOrCreate()

val fs=FileSystem.get(new URI("hdfs://172.X.X.X:8020/"),spark.sparkContext.hadoopConfiguration)

fs.globStatus(new Path("/landing/*")).toList.foreach{
   f=>
   val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + f.getPath.getName)
   event.show(false)
}