在 spark/scala 中循环处理多个 HDFS 文件
Loop and process multiple HDFS files in spark/scala
我的 HDFS 文件夹中有多个文件,我想循环并运行我的 scala 转换逻辑。
我正在使用下面的脚本,它在我使用本地文件的开发环境中工作正常,但是当我在我的 HDFS 环境中 运行 时它失败了。知道我哪里做错了吗?
val files = new File("hdfs://172.X.X.X:8020/landing/").listFiles.map(_.getName).toList
files.foreach { file =>
print(file)
val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + file)
event.show(false)
}
有人可以更正它或提出替代解决方案吗?
您应该使用 Hadoop IO 库来处理 hadoop 文件。
代码:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
val spark=SparkSession.builder().master("local[*]").getOrCreate()
val fs=FileSystem.get(new URI("hdfs://172.X.X.X:8020/"),spark.sparkContext.hadoopConfiguration)
fs.globStatus(new Path("/landing/*")).toList.foreach{
f=>
val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + f.getPath.getName)
event.show(false)
}
我的 HDFS 文件夹中有多个文件,我想循环并运行我的 scala 转换逻辑。
我正在使用下面的脚本,它在我使用本地文件的开发环境中工作正常,但是当我在我的 HDFS 环境中 运行 时它失败了。知道我哪里做错了吗?
val files = new File("hdfs://172.X.X.X:8020/landing/").listFiles.map(_.getName).toList
files.foreach { file =>
print(file)
val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + file)
event.show(false)
}
有人可以更正它或提出替代解决方案吗?
您应该使用 Hadoop IO 库来处理 hadoop 文件。
代码:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
val spark=SparkSession.builder().master("local[*]").getOrCreate()
val fs=FileSystem.get(new URI("hdfs://172.X.X.X:8020/"),spark.sparkContext.hadoopConfiguration)
fs.globStatus(new Path("/landing/*")).toList.foreach{
f=>
val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + f.getPath.getName)
event.show(false)
}