如何使用 ssc.filestream() 处理 Java 中的 zip 目录

How to use ssc.filestream() to handle a zip directory in Java

我是 Spark Streaming 的新手。

我想监控和解压特定目录中的所有 .zip 文件。 我参考了http://cutler.io/2012/07/hadoop-processing-zip-files-in-mapreduce/,写了下面的代码

JavaPairInputDStream<Text, BytesWritable> streamlogFiles=ssc.fileStream(logDir, Text.class, BytesWritable.class, ZipFileInputFormat.class);

但是,我发现 fileStream() 不处理 zip 文件 exsitedin /moved into 指定目录。

有什么想念的吗?

您可以在此处使用 ZipFileInputFormat:https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop

并使用

创建一个文件流
val files = ssc.fileStream[Text, BytesWritable, ZipFileInputFormat](someInputDirectory)

files.foreachRDD{ rdd =>
  rdd.foreachPartition { partition =>
    partition.foreach { record =>
      process(record._1.toString, record._2)
    }
  }
}

其中 record._1.toString 是文件名,因为 record._2 是该文件的 BytesWriteable。如果您不希望 InputFormat 解压缩 .zip,您将需要一个不同的自定义 FileInputFormat 或必须修改 ZipFileInputFormat。

为了对此进行测试 - 确保您添加到 someInputDirectory 的 .zip 文件的最后修改时间小于 1 分钟,否则默认情况下 SparkStreaming 将忽略它。