使用 Spark Scala 处理具有特定日期范围的目录中的数据

Processing data in directories with specific date range using Spark Scala

我正在尝试使用 Spark Scala 代码从 HDFS 文件夹加载增量数据。 所以假设我有以下文件夹:

/hadoop/user/src/2021-01-22
/hadoop/user/src/2021-01-23
/hadoop/user/src/2021-01-24
/hadoop/user/src/2021-01-25
/hadoop/user/src/2021-01-26
/hadoop/user/src/2021-01-27
/hadoop/user/src/2021-01-28
/hadoop/user/src/2021-01-29

我从 spark-submit 命令给出路径 /hadoop/user/src 然后写下面的代码

val Temp_path: String = args(1) // hadoop/user/src
val incre_path = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val incre_path_day = formatter format incre_path
val new_path = Temp_path.concat("/")
val path = new_path.concat(incre_path_day)

所以它处理 (sysdate-1) 文件夹,即今天的日期是 2021-01-29 所以它会处理 2021-01-28 目录的数据。

有什么方法可以修改代码,这样我就可以给出像 hadoop/user/src/2021-01-22 这样的路径,代码将处理数据直到 2021-01-28(即 2021-01-23、2021-01-24、2021- 01-25, 2021-01-26, 2021-01-27, 2021-01-28).

请建议我应该如何修改我的代码。

您可以使用 Hadoop 文件系统中的 listStatus 列出输入文件夹中的所有文件夹并过滤日期部分:

import org.apache.hadoop.fs.Path
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter

val inputPath = "hadoop/user/src/2021-01-22"

val startDate = inputPath.substring(inputPath.lastIndexOf("/") + 1)
val endDate = DateTimeFormatter.ofPattern("yyyy-MM-dd").format(ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1))

val baseFolder = new Path(inputPath.substring(0, inputPath.lastIndexOf("/") + 1))

val files = baseFolder.getFileSystem(sc.hadoopConfiguration).listStatus(baseFolder).map(_.getPath.toString)
val filtredFiles = files.filter(path => path.split("/").last > startDate &&  path.split("/").last < endDate)

// finally load only the folders you want
val df = spark.read.csv(filtredFiles: _*) 

您还可以将 PathFilter 函数传递给 listStatus 以在扫描基本文件夹时过滤路径