如何检查 HDFS 文件夹是否包含 CSV parquet 文件?

How to check if HDFS folder contain CSV parquet files?

如何以编程方式检查何时使用

spark.read.csv(path) 

spark.read.parquet(path)

没有用户说明路径是否包含镶木地板或文本文件。该路径应该在 HDFS 上。

一种方法可以是 运行 hdfs dfs -ls 命令并检查输出以查看输入目录是否包含 csvparquet 文件。

举个例子

// This two imports are necessary to run shell commands from Scala
import scala.sys.process._
import scala.language.postfixOps 

// As a little example of how it could be
def getExtension(s: String): String = {
    if(s.contains(".parquet")) "parquet"
    else if(s.contains(".csv")) "csv"
    else if (s.contains(".txt")) "txt"
    else "unknown"
}

val inputDirCsv = "hdfs://quickstart.cloudera:8020/user/cloudera/csv"

val inputDirParquet = "hdfs://quickstart.cloudera:8020/user/cloudera/parquet"

val lsCommand = Seq("hdfs", "dfs", "-ls", inputDirCsv).!!
println(lsCommand)
/*
Found 4 items
-rw-r--r--   1 cloudera supergroup        109 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/EmployeeManager.csv
-rw-r--r--   1 cloudera supergroup       8754 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/amigos.csv
-rw-r--r--   1 cloudera supergroup        142 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/updated_departments.csv
-rw-r--r--   1 cloudera supergroup         79 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/user.csv
 */
println(getExtension(lsCommand)) // csv

val lsCommand1 = Seq("hdfs", "dfs", "-ls", inputDirParquet).!!
println(lsCommand1)
/*
Found 5 items
-rw-r--r--   3 cloudera supergroup          0 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/_SUCCESS
-rw-r--r--   3 cloudera supergroup        599 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00000-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r--   3 cloudera supergroup        645 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00001-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r--   3 cloudera supergroup        586 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00002-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r--   3 cloudera supergroup        645 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00003-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
 */
println(getExtension(lsCommand1)) // parquet

我会利用 scala Try 并尝试使用 orElse 函数逐一导入文件类型,而不是以编程方式检查它们的扩展名 -

def readCsv(): Try[DataFrame] = ???
def readParquet(): Try[DataFrame] = ???

val dfTry: Try[DataFrame] = readCsv().orElse(readParquet())

如果您有更多的 parquet 读取请求,可以先 readParquet() 调用。