Spark 检查是否存在带正则表达式的输入路径
Spark check if input path with regex exists
我正在尝试读取使用如下正则表达式计算的路径下的 JSON 个文件。
paths.par.foreach
{
path =>
val pathWithRegex = s"${path}/*/${dateRegex}/"
val jsonDF = sqlContext.read.json(pathWithRegex)
}
paths could be - hdfs://servername/data/a, hdfs://servername/data/b, hdfs://servername/data/c
dateRegex could be - 2020-05-*
Directories present in hdfs
hdfs://servername/data/a/something/2020-05-11/file1
hdfs://servername/data/a/something/2020-05-12/file1
hdfs://servername/data/b/something/2020-05-11/file1
hdfs://servername/data/c/something/2020-06-11/file1
当我将 2020-05-* 作为 dateRegex 传递时,它抛出错误
对于 hdfs://servername/data/c//2020-05-/ 因为路径不存在。
有没有办法不抛出错误并继续?
我尝试使用下面的 checkDirExist 方法,但它似乎不起作用
regex/pattern.
def checkDirExist(path: String, sc:SparkContext): Boolean = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val p = new Path(path)
fs.exists(p)
}
paths.par.foreach
{
path =>
val pathWithRegex = s"${path}/*/${dateRegex}/"
if(checkDirExist(pathWithRegex, sc)){ //Doesn't work. Always false if pattern is in path string
val jsonDF = sqlContext.read.json(pathWithRegex)
}
}
检查下面的代码。
以下代码适用于 hdfs
、s3
和 local filesystem
。
正在导入所需的库。
import scala.util.matching.Regex
import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
用于转换为 scala 迭代器
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
可用目录和文件列表。
import sys.process._
scala> "tree /root/tmp/servername".!
/root/tmp/servername
└── data
├── a
│ └── something
│ ├── 2020-05-11
│ │ └── file1
│ └── 2020-05-12
│ └── file1
├── b
│ └── something
│ └── 2020-05-11
│ └── file1
└── c
└── something
└── 2020-06-11
└── file1
11 directories, 4 files
正在获取FileSystem
对象
def getFs(spark:SparkSession): String => FileSystem = (path: String) => {
FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)
}
val fs = getFs(spark)
检查给定路径中可用文件的函数 & return 匹配正则表达式模式的文件路径。
def exists(path: String,find:Regex)(fs: String => FileSystem) = {
fs(path)
.listFiles(new Path(path),true)
.toList.filter(_.isFile)
.map(_.getPath)
.filter(c => find.findAllIn(c.toString).length != 0)
}
val fileList = exists("/root/tmp/servername","2020-05-*".r)(fs)
最终输出
scala> fileList.foreach(println)
file:/root/tmp/servername/data/b/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-12/file1
要获取文件的元数据,请选中此 post -
我正在尝试读取使用如下正则表达式计算的路径下的 JSON 个文件。
paths.par.foreach
{
path =>
val pathWithRegex = s"${path}/*/${dateRegex}/"
val jsonDF = sqlContext.read.json(pathWithRegex)
}
paths could be - hdfs://servername/data/a, hdfs://servername/data/b, hdfs://servername/data/c
dateRegex could be - 2020-05-*
Directories present in hdfs
hdfs://servername/data/a/something/2020-05-11/file1
hdfs://servername/data/a/something/2020-05-12/file1
hdfs://servername/data/b/something/2020-05-11/file1
hdfs://servername/data/c/something/2020-06-11/file1
当我将 2020-05-* 作为 dateRegex 传递时,它抛出错误 对于 hdfs://servername/data/c//2020-05-/ 因为路径不存在。 有没有办法不抛出错误并继续? 我尝试使用下面的 checkDirExist 方法,但它似乎不起作用 regex/pattern.
def checkDirExist(path: String, sc:SparkContext): Boolean = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val p = new Path(path)
fs.exists(p)
}
paths.par.foreach
{
path =>
val pathWithRegex = s"${path}/*/${dateRegex}/"
if(checkDirExist(pathWithRegex, sc)){ //Doesn't work. Always false if pattern is in path string
val jsonDF = sqlContext.read.json(pathWithRegex)
}
}
检查下面的代码。
以下代码适用于 hdfs
、s3
和 local filesystem
。
正在导入所需的库。
import scala.util.matching.Regex
import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
用于转换为 scala 迭代器
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
可用目录和文件列表。
import sys.process._
scala> "tree /root/tmp/servername".!
/root/tmp/servername
└── data
├── a
│ └── something
│ ├── 2020-05-11
│ │ └── file1
│ └── 2020-05-12
│ └── file1
├── b
│ └── something
│ └── 2020-05-11
│ └── file1
└── c
└── something
└── 2020-06-11
└── file1
11 directories, 4 files
正在获取FileSystem
对象
def getFs(spark:SparkSession): String => FileSystem = (path: String) => {
FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)
}
val fs = getFs(spark)
检查给定路径中可用文件的函数 & return 匹配正则表达式模式的文件路径。
def exists(path: String,find:Regex)(fs: String => FileSystem) = {
fs(path)
.listFiles(new Path(path),true)
.toList.filter(_.isFile)
.map(_.getPath)
.filter(c => find.findAllIn(c.toString).length != 0)
}
val fileList = exists("/root/tmp/servername","2020-05-*".r)(fs)
最终输出
scala> fileList.foreach(println)
file:/root/tmp/servername/data/b/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-12/file1
要获取文件的元数据,请选中此 post -