文件过滤器在 Spark 中不起作用 StreamingContext.fileStream(...) API
File filter not working in Spark StreamingContext.fileStream(...) API
我正在构建一个 Spark Streaming 应用程序,我的要求是读取被监视目录中的所有预先存在的文件。
我为此使用 StreamingContext.fileStream(...)
API。这个API需要一个通过过滤功能。在我的例子中,我总是从这里 returning true
因为我需要阅读所有文件。
StreamingContext.fileStream(...)
中的 newFilesOnly
标志也设置为 false
.
[这里是 API 文档]
但是,无论returns或newFilesOnly
标志设置成什么过滤函数,相应DStream中创建的RDD都是空的。
代码片段如下:
val ssc = new StreamingContext(sparkConf, Seconds(30))
val filterF = new Function[Path, Boolean] {
def apply(x: Path): Boolean = {
println("In File " + x.toString) //Prints exisitng file's path as expected
true
}
}
val strm = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3n://<bucket>/", filterF, false).map(_._2.toString)
strm.print() //DOESN'T PRINT ANYTHING
我已经尝试了 return 过滤器函数和 newFilesOnly 标志值的不同组合,但没有任何效果。
如果我改用 StreamingContext.textFileStream(...)
,它工作正常,但只读取新文件,这是此 API.
的预期行为
我是不是漏掉了什么?任何帮助将不胜感激。提前致谢!
通过增加 FileInputDStream
的忽略 window 解决了这个问题。这可以通过更改 spark.streaming.fileStream.minRememberDuration
属性 来完成。
默认值为 1 分钟,我测试的所有文件的修改时间都超过 1 分钟,因此它们被忽略了。
有关详细信息,请参阅代码文档 here。
我正在构建一个 Spark Streaming 应用程序,我的要求是读取被监视目录中的所有预先存在的文件。
我为此使用 StreamingContext.fileStream(...)
API。这个API需要一个通过过滤功能。在我的例子中,我总是从这里 returning true
因为我需要阅读所有文件。
StreamingContext.fileStream(...)
中的 newFilesOnly
标志也设置为 false
.
[这里是 API 文档]
但是,无论returns或newFilesOnly
标志设置成什么过滤函数,相应DStream中创建的RDD都是空的。
代码片段如下:
val ssc = new StreamingContext(sparkConf, Seconds(30))
val filterF = new Function[Path, Boolean] {
def apply(x: Path): Boolean = {
println("In File " + x.toString) //Prints exisitng file's path as expected
true
}
}
val strm = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3n://<bucket>/", filterF, false).map(_._2.toString)
strm.print() //DOESN'T PRINT ANYTHING
我已经尝试了 return 过滤器函数和 newFilesOnly 标志值的不同组合,但没有任何效果。
如果我改用 StreamingContext.textFileStream(...)
,它工作正常,但只读取新文件,这是此 API.
我是不是漏掉了什么?任何帮助将不胜感激。提前致谢!
通过增加 FileInputDStream
的忽略 window 解决了这个问题。这可以通过更改 spark.streaming.fileStream.minRememberDuration
属性 来完成。
默认值为 1 分钟,我测试的所有文件的修改时间都超过 1 分钟,因此它们被忽略了。
有关详细信息,请参阅代码文档 here。