文件过滤器在 Spark 中不起作用 StreamingContext.fileStream(...) API

File filter not working in Spark StreamingContext.fileStream(...) API

我正在构建一个 Spark Streaming 应用程序,我的要求是读取被监视目录中的所有预先存在的文件。

我为此使用 StreamingContext.fileStream(...) API。这个API需要一个通过过滤功能。在我的例子中,我总是从这里 returning true 因为我需要阅读所有文件。 StreamingContext.fileStream(...) 中的 newFilesOnly 标志也设置为 false.

[这里是 API 文档]

但是,无论returns或newFilesOnly标志设置成什么过滤函数,相应DStream中创建的RDD都是空的。

代码片段如下:

val ssc = new StreamingContext(sparkConf, Seconds(30))
val filterF = new Function[Path, Boolean] {
    def apply(x: Path): Boolean = {
      println("In File " + x.toString) //Prints exisitng file's path as expected
      true
    }
}
val strm = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3n://<bucket>/", filterF, false).map(_._2.toString)
strm.print()     //DOESN'T PRINT ANYTHING

我已经尝试了 return 过滤器函数和 newFilesOnly 标志值的不同组合,但没有任何效果。

如果我改用 StreamingContext.textFileStream(...),它工作正常,但只读取新文件,这是此 API.

的预期行为

我是不是漏掉了什么?任何帮助将不胜感激。提前致谢!

通过增加 FileInputDStream 的忽略 window 解决了这个问题。这可以通过更改 spark.streaming.fileStream.minRememberDuration 属性 来完成。 默认值为 1 分钟,我测试的所有文件的修改时间都超过 1 分钟,因此它们被忽略了。 有关详细信息,请参阅代码文档 here