如何在 hadoop 的新目录中解压缩 .gz 文件?

How to unzip .gz files in a new directory in hadoop?

我在 hdfs 的文件夹中有一堆 .gz 文件。我想将所有这些 .gz 文件解压缩到 hdfs 中的一个新文件夹中。我应该怎么做?

您可以使用配置单元执行此操作(假设它是文本数据)。

create external table source (t str) location '<directory_with_gz_files>';
create external table target (t str) location '<target_dir>';
insert into table target select * from source;

数据将被解压缩到一组新文件中。

如果您不想更改名称并且您所在的节点上有足够的存储空间运行,您可以这样做。

hadoop fs -get <your_source_directory> <directory_name>
It will create a directory where you run hadoop command. cd to it and gunzip all the files
cd ..
hadoop fs -moveFromLocal <directory_name> <target_hdfs_path>

我可以想到通过 3 种不同的方式实现它。

  1. 使用Linux命令行

    以下命令对我有用。

    hadoop fs -cat /tmp/Links.txt.gz | gzip -d | hadoop fs -put - /tmp/unzipped/Links.txt
    

    我的压缩文件是 Links.txt.gz
    输出存储在 /tmp/unzipped/Links.txt

  2. 使用Java程序

    Hadoop The Definitve Guide书中,有一节是关于Codecs的。在该部分中,有一个使用 CompressionCodecFactory 解压缩输出的程序。我是re-producing那个代码:

    package com.myorg.hadooptests;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.URI;
    
    public class FileDecompressor {
        public static void main(String[] args) throws Exception {
            String uri = args[0];
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            Path inputPath = new Path(uri);
            CompressionCodecFactory factory = new CompressionCodecFactory(conf);
            CompressionCodec codec = factory.getCodec(inputPath);
            if (codec == null) {
                System.err.println("No codec found for " + uri);
                System.exit(1);
            }
            String outputUri =
            CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
            InputStream in = null;
            OutputStream out = null;
            try {
                in = codec.createInputStream(fs.open(inputPath));
                out = fs.create(new Path(outputUri));
                IOUtils.copyBytes(in, out, conf);
            } finally {
                IOUtils.closeStream(in);
                IOUtils.closeStream(out);
            }
        }
    }
    

    此代码将gz 文件路径作为输入。
    您可以按以下方式执行:

    FileDecompressor <gzipped file name>
    

    例如当我执行压缩文件时:

    FileDecompressor /tmp/Links.txt.gz
    

    我在以下位置获得了解压缩的文件:/tmp/Links.txt

    它将解压缩的文件存储在同一文件夹中。所以需要修改这段代码,取2个输入参数:<input file path> and <output folder>.

    让这个程序运行后,您可以编写一个 Shell/Perl/Python 脚本来为您拥有的每个输入调用这个程序。

  3. 使用 Pig 脚本

    您可以编写一个简单的 Pig 脚本来实现此目的。

    我编写了以下脚本,它有效:

    A = LOAD '/tmp/Links.txt.gz' USING PigStorage();
    Store A into '/tmp/tmp_unzipped/' USING PigStorage();
    mv /tmp/tmp_unzipped/part-m-00000 /tmp/unzipped/Links.txt
    rm /tmp/tmp_unzipped/
    

    当您运行此脚本时,解压缩的内容存储在一个临时文件夹中:/tmp/tmp_unzipped。此文件夹将包含

    /tmp/tmp_unzipped/_SUCCESS
    /tmp/tmp_unzipped/part-m-00000
    

    part-m-00000 包含解压缩的文件。

    因此,我们需要使用以下命令显式重命名它,最后删除 /tmp/tmp_unzipped 文件夹:

    mv /tmp/tmp_unzipped/part-m-00000 /tmp/unzipped/Links.txt
    rm /tmp/tmp_unzipped/
    

    因此,如果您使用此 Pig 脚本,则只需注意参数化文件名(Links.txt.gz 和 Links.txt)。

    同样,一旦您使此脚本运行,您可以编写一个 Shell/Perl/Python 脚本来为您拥有的每个输入调用此 Pig 脚本。

如果您有压缩的文本文件,hadoop fs -text 支持 gzip 以及其他常见的压缩格式(snappy、lzo)。

hadoop fs -text /tmp/a.gz | hadoop fs -put - /tmp/uncompressed_a

Bash解决方案

就我而言,我不想通过管道解压缩文件,因为我不确定它们的内容。相反,我想确保 zip 文件中的所有文件都将提取到 HDFS 上。

我创建了一个简单的 bash 脚本。评论应该给你一个线索是怎么回事。下面有一个简短的描述。

#!/bin/bash

workdir=/tmp/unziphdfs/
cd $workdir

# get all zip files in a folder
zips=$(hadoop fs -ls /yourpath/*.zip | awk '{print }')
for hdfsfile in $zips
do
    echo $hdfsfile

    # copy to temp folder to unpack
    hdfs dfs -copyToLocal $hdfsfile $workdir

    hdfsdir=$(dirname "$hdfsfile")
    zipname=$(basename "$hdfsfile")

    # unpack locally and remove
    unzip $zipname
    rm -rf $zipname

    # copy files back to hdfs
    files=$(ls $workdir)
    for file in $files; do
       hdfs dfs -copyFromLocal $file $hdfsdir
       rm -rf $file
    done

    # optionally remove the zip file from hdfs?
    # hadoop fs -rm -skipTrash $hdfsfile
done

描述

  1. 获取 hdfs 目录中的所有 *.zip 个文件
  2. 逐一:将 zip 复制到临时目录(在文件系统上)
  3. 解压缩
  4. 将所有解压缩的文件复制到压缩文件的目录
  5. 清理

我使用 /mypath/*/*.zip 设法让它与每个 zip 文件中的许多 zip 文件的子目录结构一起工作。

祝你好运:)

Hadoop 的 FileUtil class 有 unTar() and unZip() 方法来实现这一点。 unTar() 方法也适用于 .tar.gz.tgz 文件。不幸的是,它们只能处理本地文件系统上的文件。您必须使用相同的 class 的 copy() 方法之一来复制您需要使用的任何分布式文件系统。

提供 Scala 代码

import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, CompressionInputStream}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.io.IOUtils
 val conf = new org.apache.hadoop.conf.Configuration()


 def extractFile (sparkSession: SparkSession, compath : String, uncompPath :String): String = {
         val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
         val inputPath  = new Path(compath)
         val factory = new CompressionCodecFactory(sparkSession.sparkContext.hadoopConfiguration);
       val codec = factory.getCodec(inputPath)
         if (codec == null){
           throw new RuntimeException(s"Not a valid codex $codec")
         }
    
         var in : CompressionInputStream = null;
         var out : FSDataOutputStream = null;
         try {
            in = codec.createInputStream(fs.open(inputPath));
            out = fs.create(new Path(uncompPath));
           IOUtils.copyBytes(in, out, conf);
         } finally {
           IOUtils.closeStream(in);
           IOUtils.closeStream(out);
         }
         uncompPath
       }