Spark 阅读 .7z 文件

Spark Reading .7z files

我正在尝试使用 scala 或 java 读取 spark .7z 文件。我没有找到任何合适的方法或功能。

对于 zip 文件,我能够读取,因为 ZipInputStream class 采用输入流,但对于 7Z 文件,class SevenZFile 不采用任何输入流。 https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html

Zip 文件代码

spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}

我正在为 7z 文件尝试类似的代码,例如

spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new SevenZFile(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}

但 SevenZFile 不接受这些 formats.Looking 的想法。

如果文件在本地文件系统中,下面的解决方案有效,但我的文件在 hdfs 中

本地文件系统代码

 public static void decompress(String in, File destination) throws IOException {
        SevenZFile sevenZFile = new SevenZFile(new File(in));
        SevenZArchiveEntry entry;
        while ((entry = sevenZFile.getNextEntry()) != null){
            if (entry.isDirectory()){
                continue;
            }
            File curfile = new File(destination, entry.getName());
            File parent = curfile.getParentFile();
            if (!parent.exists()) {
                parent.mkdirs();
            }
            FileOutputStream out = new FileOutputStream(curfile);
            byte[] content = new byte[(int) entry.getSize()];
            sevenZFile.read(content, 0, content.length);
            out.write(content);
            out.close();
        }
    }

经过这么多年的火花进化,应该有简单的方法来做到这一点。

您可以尝试 SeekableByteChannel 方法,而不是使用基于 java.io.File 的方法,如 alternative constructor.

中所示

您可以使用 SeekableInMemoryByteChannel 来读取字节数组。因此,只要您可以从 S3 或其他任何地方获取 7zip 文件并将它们作为字节数组提交,您就应该没问题。

综上所述,Spark 确实 well-suited 无法处理 zip 和 7zip 文件等内容。我可以根据个人经验告诉你,一旦文件太大以至于 Spark 的执行程序无法处理,它就会严重失败。

像 Apache NiFi 这样的东西在扩展档案和处理它们方面会更好。 FWIW,我目前正在处理一个大型数据转储,我经常处理其中包含数百万个文件的 50GB tarball,而 NiFi 处理它们非常优雅。