如何使用 ssc.filestream() 处理 Java 中的 zip 目录
How to use ssc.filestream() to handle a zip directory in Java
我是 Spark Streaming 的新手。
我想监控和解压特定目录中的所有 .zip 文件。
我参考了http://cutler.io/2012/07/hadoop-processing-zip-files-in-mapreduce/,写了下面的代码
JavaPairInputDStream<Text, BytesWritable> streamlogFiles=ssc.fileStream(logDir, Text.class, BytesWritable.class, ZipFileInputFormat.class);
但是,我发现 fileStream() 不处理 zip 文件 exsitedin /moved into 指定目录。
有什么想念的吗?
您可以在此处使用 ZipFileInputFormat:https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop
并使用
创建一个文件流
val files = ssc.fileStream[Text, BytesWritable, ZipFileInputFormat](someInputDirectory)
files.foreachRDD{ rdd =>
rdd.foreachPartition { partition =>
partition.foreach { record =>
process(record._1.toString, record._2)
}
}
}
其中 record._1.toString
是文件名,因为 record._2
是该文件的 BytesWriteable。如果您不希望 InputFormat 解压缩 .zip,您将需要一个不同的自定义 FileInputFormat 或必须修改 ZipFileInputFormat。
为了对此进行测试 - 确保您添加到 someInputDirectory
的 .zip 文件的最后修改时间小于 1 分钟,否则默认情况下 SparkStreaming 将忽略它。
我是 Spark Streaming 的新手。
我想监控和解压特定目录中的所有 .zip 文件。 我参考了http://cutler.io/2012/07/hadoop-processing-zip-files-in-mapreduce/,写了下面的代码
JavaPairInputDStream<Text, BytesWritable> streamlogFiles=ssc.fileStream(logDir, Text.class, BytesWritable.class, ZipFileInputFormat.class);
但是,我发现 fileStream() 不处理 zip 文件 exsitedin /moved into 指定目录。
有什么想念的吗?
您可以在此处使用 ZipFileInputFormat:https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop
并使用
创建一个文件流val files = ssc.fileStream[Text, BytesWritable, ZipFileInputFormat](someInputDirectory)
files.foreachRDD{ rdd =>
rdd.foreachPartition { partition =>
partition.foreach { record =>
process(record._1.toString, record._2)
}
}
}
其中 record._1.toString
是文件名,因为 record._2
是该文件的 BytesWriteable。如果您不希望 InputFormat 解压缩 .zip,您将需要一个不同的自定义 FileInputFormat 或必须修改 ZipFileInputFormat。
为了对此进行测试 - 确保您添加到 someInputDirectory
的 .zip 文件的最后修改时间小于 1 分钟,否则默认情况下 SparkStreaming 将忽略它。