从文件读取数据时 Spark "modifiedBefore" 选项
Spark "modifiedBefore" option while reading data from files
我正在使用 Spark-2.4 从 hadoop 读取文件。
需求是读取修改时间在某个给定值之前的文件。
我看到了提到选项 modifiedBefore
的 spark 文档,请参考下面的 spark 文档 Modification Time Path Filters,但我不确定它是否在 spark 2.4 中可用,如果不是如何我能做到吗?
选项 modifiedBefore
和 modifiedAfter
从 Spark 3+ 开始可用,并且只能用于批处理而不是流式处理。对于 Spark 2.4,您可以使用 Hadoop FileSystem globStatus
方法并使用 getModificationTime
.
过滤文件
这里是一个函数示例,该函数采用路径和阈值以及 returns 使用阈值过滤的文件路径列表:
import org.apache.hadoop.fs.Path
def getFilesModifiedBefore(path: Path, modifiedBefore: String) = {
val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
val thresHoldTime = format.parse(modifiedBefore).getTime()
val files = path.getFileSystem(sc.hadoopConfiguration).globStatus(path)
files.filter(_.getModificationTime < thresHoldTime).map(_.getPath.toString)
}
然后与 spark.read.csv
一起使用:
val df = spark.read.csv(getFilesModifiedBefore(new Path("/mypath"), "2021-03-17T10:46:12"):_*)
我正在使用 Spark-2.4 从 hadoop 读取文件。 需求是读取修改时间在某个给定值之前的文件。
我看到了提到选项 modifiedBefore
的 spark 文档,请参考下面的 spark 文档 Modification Time Path Filters,但我不确定它是否在 spark 2.4 中可用,如果不是如何我能做到吗?
选项 modifiedBefore
和 modifiedAfter
从 Spark 3+ 开始可用,并且只能用于批处理而不是流式处理。对于 Spark 2.4,您可以使用 Hadoop FileSystem globStatus
方法并使用 getModificationTime
.
这里是一个函数示例,该函数采用路径和阈值以及 returns 使用阈值过滤的文件路径列表:
import org.apache.hadoop.fs.Path
def getFilesModifiedBefore(path: Path, modifiedBefore: String) = {
val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
val thresHoldTime = format.parse(modifiedBefore).getTime()
val files = path.getFileSystem(sc.hadoopConfiguration).globStatus(path)
files.filter(_.getModificationTime < thresHoldTime).map(_.getPath.toString)
}
然后与 spark.read.csv
一起使用:
val df = spark.read.csv(getFilesModifiedBefore(new Path("/mypath"), "2021-03-17T10:46:12"):_*)