Hadoop - 解压缩的 zip 文件

Hadoop - Decompressed zip files

我有很多 zip 格式的压缩文件(以 GB 为单位),我想编写 map only 作业来解压缩它们。我的映射器 class 看起来像

import java.util.zip.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.*;

public class DecompressMapper extends Mapper <LongWritable, Text, LongWritable, Text>
{
    private static final int BUFFER_SIZE = 4096;

    public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Context context) throws IOException
    {
        FileSplit fileSplit = (FileSplit)context.getInputSplit();
        String fileName = fileSplit.getPath().getName();
        this.unzip(fileName, new File(fileName).getParent()  + File.separator +  "/test_poc");  
    }

    public void unzip(String zipFilePath, String destDirectory) throws IOException {
        File destDir = new File(destDirectory);
        if (!destDir.exists()) {
            destDir.mkdir();
        }
        ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath));
        ZipEntry entry = zipIn.getNextEntry();
        // iterates over entries in the zip file
        while (entry != null) {
            String filePath = destDirectory + File.separator + entry.getName();
            if (!entry.isDirectory()) {
                // if the entry is a file, extracts it
                extractFile(zipIn, filePath);
            } else {
                // if the entry is a directory, make the directory
                File dir = new File(filePath);
                dir.mkdir();
            }
            zipIn.closeEntry();
            entry = zipIn.getNextEntry();
        }
        zipIn.close();
    }

    private void extractFile(ZipInputStream zipIn, String filePath) throws IOException {
        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath));
        byte[] bytesIn = new byte[BUFFER_SIZE];
        int read = 0;
        while ((read = zipIn.read(bytesIn)) != -1) {
            bos.write(bytesIn, 0, read);
        }
        bos.close();
    }
}

和我的driverclass

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DecompressJob extends Configured implements Tool{

    public static void main(String[] args) throws Exception
    {
      int res = ToolRunner.run(new Configuration(), new DecompressJob(),args);
      System.exit(res);
    }

    public int run(String[] args) throws Exception
    {
        Job conf = Job.getInstance(getConf());
        conf.setJobName("MapperOnly");

        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(DecompressMapper.class);
        conf.setNumReduceTasks(0);

        Path inp = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.addInputPath(conf, inp);
        FileOutputFormat.setOutputPath(conf, out);

        return conf.waitForCompletion(true) ? 0: 1;
    }
}

我的映射器 class 似乎工作不正常。我没有在所需目录中获得解压缩文件。任何帮助表示赞赏。谢谢...

以上代码问题不大

  1. 我正在使用 MR1 API 和 MR2 API。永远不要那样做。
  2. 使用了Java IO 函数。 Hadoop 无法识别其文件系统中的 Java IO 函数。
  3. 路径生成不正确。

我们在编写 map reduce 程序时需要小心,因为 hadoop 使用完全不同的文件系统,我们在编写代码时需要考虑到这一点,切勿混合使用 MR1 和 MR2 APIs。

好吧,在 hadoop 文件系统中没有解压缩文件的具体方法,但经过长时间的研究,我已经弄清楚了如何在 hadoop 文件系统中直接解压缩它。先决条件您是否需要将 zip 文件复制到某个位置,然后 运行 一个 mapreduce 作业。很明显,hadoop 不理解 zipfile 输入格式,因此我们需要自定义 Mapper 和 reducer,以便我们可以控制 mapper 发出和 reducer 消耗的内容。请注意,此 Mapreduce 将在单个 Mapper 上 运行,因为在自定义由 hadoop 提供的 Record Reader Class 时,我们将禁用 split 方法,即使其为 false。因此,Mapreduce 将以 Filename 作为 Key 并将 Content of Uncompressed File 作为 作为 Value。当 reducer 使用它时,我将输出 outputkey 设置为 null,因此只有解压缩的内容保留在 reducer 中,并且 reducer 的数量设置为一个,因此所有转储都在一个部分文件中。

我们都知道 hadoop 不能自己处理 zip 文件,但是 java 可以借助它自己的 ZipFile class 来处理,它可以通过 zipinputstrem 读取 zip 文件内容和 zip 入口通过 zipentry 所以我们写了一个定制的 ZipInputFormat Class 它 扩展了 FileInputFormat.

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ZipFileInputFormat extends FileInputFormat<Text, BytesWritable> {
/** See the comments on the setLenient() method */
private static boolean isLenient = false;

/**
 * ZIP files are not splitable so they cannot be overrided so function
 * return false
 */
@Override
protected boolean isSplitable(JobContext context, Path filename) {
    return false;
}

/**
 * Create the ZipFileRecordReader to parse the file
 */
@Override
public RecordReader<Text, BytesWritable> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException,
        InterruptedException {
    return new ZipFileRecordReader();
}

/**
 * 
 * @param lenient
 */
public static void setLenient(boolean lenient) {
    isLenient = lenient;
}

public static boolean getLenient() {
    return isLenient;
}
}

注意RecordReaderclassreturnsZipFileRecordReadeader定制版Hadoop RecordReaderclass 我们正在讨论 about.Lets 现在稍微简化 RecordReader class

import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class ZipFileRecordReader extends RecordReader<Text, BytesWritable> {
/** InputStream used to read the ZIP file from the FileSystem */
private FSDataInputStream fsin;

/** ZIP file parser/decompresser */
private ZipInputStream zip;

/** Uncompressed file name */
private Text currentKey;

/** Uncompressed file contents */
private BytesWritable currentValue;

/** Used to indicate progress */
private boolean isFinished = false;

/**
 * Initialise and open the ZIP file from the FileSystem
 */
@Override
public void initialize(InputSplit inputSplit,
        TaskAttemptContext taskAttemptContext) throws IOException,
        InterruptedException {
    FileSplit split = (FileSplit) inputSplit;
    Configuration conf = taskAttemptContext.getConfiguration();
    Path path = split.getPath();
    FileSystem fs = path.getFileSystem(conf);

    // Open the stream
    fsin = fs.open(path);
    zip = new ZipInputStream(fsin);
}

/**
 * Each ZipEntry is decompressed and readied for the Mapper. The contents of
 * each file is held *in memory* in a BytesWritable object.
 * 
 * If the ZipFileInputFormat has been set to Lenient (not the default),
 * certain exceptions will be gracefully ignored to prevent a larger job
 * from failing.
 */
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    ZipEntry entry = null;
    try {
        entry = zip.getNextEntry();
    } catch (ZipException e) {
        if (ZipFileInputFormat.getLenient() == false)
            throw e;
    }

    // Sanity check
    if (entry == null) {
        isFinished = true;
        return false;
    }

    // Filename
    currentKey = new Text(entry.getName());

    if (currentKey.toString().endsWith(".zip")) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        byte[] temp1 = new byte[8192];
        while (true) {
            int bytesread1 = 0;
            try {
                bytesread1 = zip.read(temp1, 0, 8192);
            } catch (EOFException e) {
                if (ZipFileInputFormat.getLenient() == false)
                    throw e;
                return false;
            }
            if (bytesread1 > 0)
                bos.write(temp1, 0, bytesread1);
            else
                break;
        }

        zip.closeEntry();
        currentValue = new BytesWritable(bos.toByteArray());
        return true;

    }

    // Read the file contents
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    byte[] temp = new byte[8192];
    while (true) {
        int bytesRead = 0;
        try {
            bytesRead = zip.read(temp, 0, 8192);
        } catch (EOFException e) {
            if (ZipFileInputFormat.getLenient() == false)
                throw e;
            return false;
        }
        if (bytesRead > 0)
            bos.write(temp, 0, bytesRead);
        else
            break;
    }
    zip.closeEntry();

    // Uncompressed contents
    currentValue = new BytesWritable(bos.toByteArray());
    return true;
}

/**
 * Rather than calculating progress, we just keep it simple
 */
@Override
public float getProgress() throws IOException, InterruptedException {
    return isFinished ? 1 : 0;
}

/**
 * Returns the current key (name of the zipped file)
 */
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
    return currentKey;
}

/**
 * Returns the current value (contents of the zipped file)
 */
@Override
public BytesWritable getCurrentValue() throws IOException,
        InterruptedException {
    return currentValue;
}

/**
 * Close quietly, ignoring any exceptions
 */
@Override
public void close() throws IOException {
    try {
        zip.close();
    } catch (Exception ignore) {
    }
    try {
        fsin.close();
    } catch (Exception ignore) {
    }
}
}

好的,为了方便起见,我在源代码中给出了一些注释,这样你就可以很容易地理解文件是如何使用缓冲区读写的 memory.Now 让我们将上面的映射器 class 写到 classes

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper; 

public class MyMapper extends Mapper<Text, BytesWritable, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Text key, BytesWritable value, Context context)
        throws IOException, InterruptedException {

    String filename = key.toString();

    // We only want to process .txt files
    if (filename.endsWith(".txt") == false)
        return;

    // Prepare the content
    String content = new String(value.getBytes(), "UTF-8");

    context.write(new Text(content), one);
}
}

让我们快速编写相同的 Reducer

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
        sum += val.get();
    }
    // context.write(key, new IntWritable(sum));
    context.write(new Text(key), null);
}
}

让我们快速配置 Mapper 和 Reducer 的作业

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.saama.CustomisedMapperReducer.MyMapper;
import com.saama.CustomisedMapperReducer.MyReducer;
import com.saama.CustomisedMapperReducer.ZipFileInputFormat;
import com.saama.CustomisedMapperReducer.ZipFileRecordReader;

public class MyJob {

@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException,
        ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();

    Job job = new Job(conf);
    job.setJarByClass(MyJob.class);
    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);

    job.setInputFormatClass(ZipFileInputFormat.class);
    job.setOutputKeyClass(TextOutputFormat.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    ZipFileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setNumReduceTasks(1);

    job.waitForCompletion(true);

}
}

请注意,在作业 Class 中,我们已将 InputFormatClass 配置为我们的 ZipFileInputFormat Class 并将 OutputFormatClass 配置为 TextOutPutFormat Class .

Mavenize 项目并让依赖关系保持 运行 code.Export Jar 文件的原样,并将其部署到您的 hadoop 集群上。在 CDH5.5 YARN 上测试和部署。 POM 文件的内容为

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mithun</groupId>
<artifactId>CustomisedMapperReducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>CustomisedMapperReducer</name>
<url>http://maven.apache.org</url>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.9.13</version>
    </dependency>


    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
</dependencies>
</project>