Spark 检查是否存在带正则表达式的输入路径

Spark check if input path with regex exists

我正在尝试读取使用如下正则表达式计算的路径下的 JSON 个文件。

paths.par.foreach
{ 
    path =>
    val pathWithRegex = s"${path}/*/${dateRegex}/"
    val jsonDF = sqlContext.read.json(pathWithRegex)
}
paths could be - hdfs://servername/data/a, hdfs://servername/data/b, hdfs://servername/data/c
dateRegex could be - 2020-05-*

Directories present in hdfs
hdfs://servername/data/a/something/2020-05-11/file1
hdfs://servername/data/a/something/2020-05-12/file1
hdfs://servername/data/b/something/2020-05-11/file1
hdfs://servername/data/c/something/2020-06-11/file1

当我将 2020-05-* 作为 dateRegex 传递时,它抛出错误 对于 hdfs://servername/data/c//2020-05-/ 因为路径不存在。 有没有办法不抛出错误并继续? 我尝试使用下面的 checkDirExist 方法,但它似乎不起作用 regex/pattern.

def checkDirExist(path: String, sc:SparkContext): Boolean = {
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val p = new Path(path)
    fs.exists(p)
}

paths.par.foreach
{ 
    path =>
    val pathWithRegex = s"${path}/*/${dateRegex}/"
    if(checkDirExist(pathWithRegex, sc)){ //Doesn't work. Always false if pattern is in path string
        val jsonDF = sqlContext.read.json(pathWithRegex)
    }
}

检查下面的代码。

以下代码适用于 hdfss3local filesystem

正在导入所需的库。

import  scala.util.matching.Regex
import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator} 

用于转换为 scala 迭代器

implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
}

可用目录和文件列表。

import sys.process._
scala> "tree /root/tmp/servername".!
/root/tmp/servername
└── data
    ├── a
    │   └── something
    │       ├── 2020-05-11
    │       │   └── file1
    │       └── 2020-05-12
    │           └── file1
    ├── b
    │   └── something
    │       └── 2020-05-11
    │           └── file1
    └── c
        └── something
            └── 2020-06-11
                └── file1

11 directories, 4 files

正在获取FileSystem对象

def getFs(spark:SparkSession): String => FileSystem = (path: String) => {
    FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)
}
val fs = getFs(spark)

检查给定路径中可用文件的函数 & return 匹配正则表达式模式的文件路径。

def exists(path: String,find:Regex)(fs: String => FileSystem) = { 
    fs(path)
    .listFiles(new Path(path),true)
    .toList.filter(_.isFile)
    .map(_.getPath)
    .filter(c => find.findAllIn(c.toString).length != 0)   
}
val fileList = exists("/root/tmp/servername","2020-05-*".r)(fs)

最终输出

scala> fileList.foreach(println)
file:/root/tmp/servername/data/b/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-12/file1

要获取文件的元数据,请选中此 post -