如何检查 HDFS 文件夹是否包含 CSV parquet 文件?
How to check if HDFS folder contain CSV parquet files?
如何以编程方式检查何时使用
spark.read.csv(path)
或
spark.read.parquet(path)
没有用户说明路径是否包含镶木地板或文本文件。该路径应该在 HDFS 上。
一种方法可以是 运行 hdfs dfs -ls
命令并检查输出以查看输入目录是否包含 csv
或 parquet
文件。
举个例子
// This two imports are necessary to run shell commands from Scala
import scala.sys.process._
import scala.language.postfixOps
// As a little example of how it could be
def getExtension(s: String): String = {
if(s.contains(".parquet")) "parquet"
else if(s.contains(".csv")) "csv"
else if (s.contains(".txt")) "txt"
else "unknown"
}
val inputDirCsv = "hdfs://quickstart.cloudera:8020/user/cloudera/csv"
val inputDirParquet = "hdfs://quickstart.cloudera:8020/user/cloudera/parquet"
val lsCommand = Seq("hdfs", "dfs", "-ls", inputDirCsv).!!
println(lsCommand)
/*
Found 4 items
-rw-r--r-- 1 cloudera supergroup 109 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/EmployeeManager.csv
-rw-r--r-- 1 cloudera supergroup 8754 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/amigos.csv
-rw-r--r-- 1 cloudera supergroup 142 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/updated_departments.csv
-rw-r--r-- 1 cloudera supergroup 79 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/user.csv
*/
println(getExtension(lsCommand)) // csv
val lsCommand1 = Seq("hdfs", "dfs", "-ls", inputDirParquet).!!
println(lsCommand1)
/*
Found 5 items
-rw-r--r-- 3 cloudera supergroup 0 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/_SUCCESS
-rw-r--r-- 3 cloudera supergroup 599 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00000-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r-- 3 cloudera supergroup 645 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00001-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r-- 3 cloudera supergroup 586 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00002-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r-- 3 cloudera supergroup 645 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00003-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
*/
println(getExtension(lsCommand1)) // parquet
我会利用 scala Try
并尝试使用 orElse
函数逐一导入文件类型,而不是以编程方式检查它们的扩展名 -
def readCsv(): Try[DataFrame] = ???
def readParquet(): Try[DataFrame] = ???
val dfTry: Try[DataFrame] = readCsv().orElse(readParquet())
如果您有更多的 parquet 读取请求,可以先 readParquet()
调用。
如何以编程方式检查何时使用
spark.read.csv(path)
或
spark.read.parquet(path)
没有用户说明路径是否包含镶木地板或文本文件。该路径应该在 HDFS 上。
一种方法可以是 运行 hdfs dfs -ls
命令并检查输出以查看输入目录是否包含 csv
或 parquet
文件。
举个例子
// This two imports are necessary to run shell commands from Scala
import scala.sys.process._
import scala.language.postfixOps
// As a little example of how it could be
def getExtension(s: String): String = {
if(s.contains(".parquet")) "parquet"
else if(s.contains(".csv")) "csv"
else if (s.contains(".txt")) "txt"
else "unknown"
}
val inputDirCsv = "hdfs://quickstart.cloudera:8020/user/cloudera/csv"
val inputDirParquet = "hdfs://quickstart.cloudera:8020/user/cloudera/parquet"
val lsCommand = Seq("hdfs", "dfs", "-ls", inputDirCsv).!!
println(lsCommand)
/*
Found 4 items
-rw-r--r-- 1 cloudera supergroup 109 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/EmployeeManager.csv
-rw-r--r-- 1 cloudera supergroup 8754 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/amigos.csv
-rw-r--r-- 1 cloudera supergroup 142 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/updated_departments.csv
-rw-r--r-- 1 cloudera supergroup 79 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/user.csv
*/
println(getExtension(lsCommand)) // csv
val lsCommand1 = Seq("hdfs", "dfs", "-ls", inputDirParquet).!!
println(lsCommand1)
/*
Found 5 items
-rw-r--r-- 3 cloudera supergroup 0 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/_SUCCESS
-rw-r--r-- 3 cloudera supergroup 599 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00000-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r-- 3 cloudera supergroup 645 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00001-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r-- 3 cloudera supergroup 586 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00002-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r-- 3 cloudera supergroup 645 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00003-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
*/
println(getExtension(lsCommand1)) // parquet
我会利用 scala Try
并尝试使用 orElse
函数逐一导入文件类型,而不是以编程方式检查它们的扩展名 -
def readCsv(): Try[DataFrame] = ???
def readParquet(): Try[DataFrame] = ???
val dfTry: Try[DataFrame] = readCsv().orElse(readParquet())
如果您有更多的 parquet 读取请求,可以先 readParquet()
调用。