如何在 hadoop 的新目录中解压缩 .gz 文件?
How to unzip .gz files in a new directory in hadoop?
我在 hdfs 的文件夹中有一堆 .gz 文件。我想将所有这些 .gz 文件解压缩到 hdfs 中的一个新文件夹中。我应该怎么做?
您可以使用配置单元执行此操作(假设它是文本数据)。
create external table source (t str) location '<directory_with_gz_files>';
create external table target (t str) location '<target_dir>';
insert into table target select * from source;
数据将被解压缩到一组新文件中。
如果您不想更改名称并且您所在的节点上有足够的存储空间运行,您可以这样做。
hadoop fs -get <your_source_directory> <directory_name>
It will create a directory where you run hadoop command. cd to it and gunzip all the files
cd ..
hadoop fs -moveFromLocal <directory_name> <target_hdfs_path>
我可以想到通过 3 种不同的方式实现它。
使用Linux命令行
以下命令对我有用。
hadoop fs -cat /tmp/Links.txt.gz | gzip -d | hadoop fs -put - /tmp/unzipped/Links.txt
我的压缩文件是 Links.txt.gz
输出存储在 /tmp/unzipped/Links.txt
使用Java程序
在Hadoop The Definitve Guide
书中,有一节是关于Codecs
的。在该部分中,有一个使用 CompressionCodecFactory
解压缩输出的程序。我是re-producing那个代码:
package com.myorg.hadooptests;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri =
CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
此代码将gz 文件路径作为输入。
您可以按以下方式执行:
FileDecompressor <gzipped file name>
例如当我执行压缩文件时:
FileDecompressor /tmp/Links.txt.gz
我在以下位置获得了解压缩的文件:/tmp/Links.txt
它将解压缩的文件存储在同一文件夹中。所以需要修改这段代码,取2个输入参数:<input file path> and <output folder>
.
让这个程序运行后,您可以编写一个 Shell/Perl/Python 脚本来为您拥有的每个输入调用这个程序。
使用 Pig 脚本
您可以编写一个简单的 Pig 脚本来实现此目的。
我编写了以下脚本,它有效:
A = LOAD '/tmp/Links.txt.gz' USING PigStorage();
Store A into '/tmp/tmp_unzipped/' USING PigStorage();
mv /tmp/tmp_unzipped/part-m-00000 /tmp/unzipped/Links.txt
rm /tmp/tmp_unzipped/
当您运行此脚本时,解压缩的内容存储在一个临时文件夹中:/tmp/tmp_unzipped
。此文件夹将包含
/tmp/tmp_unzipped/_SUCCESS
/tmp/tmp_unzipped/part-m-00000
part-m-00000
包含解压缩的文件。
因此,我们需要使用以下命令显式重命名它,最后删除 /tmp/tmp_unzipped
文件夹:
mv /tmp/tmp_unzipped/part-m-00000 /tmp/unzipped/Links.txt
rm /tmp/tmp_unzipped/
因此,如果您使用此 Pig 脚本,则只需注意参数化文件名(Links.txt.gz 和 Links.txt)。
同样,一旦您使此脚本运行,您可以编写一个 Shell/Perl/Python 脚本来为您拥有的每个输入调用此 Pig 脚本。
如果您有压缩的文本文件,hadoop fs -text 支持 gzip 以及其他常见的压缩格式(snappy、lzo)。
hadoop fs -text /tmp/a.gz | hadoop fs -put - /tmp/uncompressed_a
Bash解决方案
就我而言,我不想通过管道解压缩文件,因为我不确定它们的内容。相反,我想确保 zip 文件中的所有文件都将提取到 HDFS 上。
我创建了一个简单的 bash 脚本。评论应该给你一个线索是怎么回事。下面有一个简短的描述。
#!/bin/bash
workdir=/tmp/unziphdfs/
cd $workdir
# get all zip files in a folder
zips=$(hadoop fs -ls /yourpath/*.zip | awk '{print }')
for hdfsfile in $zips
do
echo $hdfsfile
# copy to temp folder to unpack
hdfs dfs -copyToLocal $hdfsfile $workdir
hdfsdir=$(dirname "$hdfsfile")
zipname=$(basename "$hdfsfile")
# unpack locally and remove
unzip $zipname
rm -rf $zipname
# copy files back to hdfs
files=$(ls $workdir)
for file in $files; do
hdfs dfs -copyFromLocal $file $hdfsdir
rm -rf $file
done
# optionally remove the zip file from hdfs?
# hadoop fs -rm -skipTrash $hdfsfile
done
描述
- 获取
hdfs
目录中的所有 *.zip
个文件
- 逐一:将
zip
复制到临时目录(在文件系统上)
- 解压缩
- 将所有解压缩的文件复制到压缩文件的目录
- 清理
我使用 /mypath/*/*.zip
设法让它与每个 zip 文件中的许多 zip 文件的子目录结构一起工作。
祝你好运:)
Hadoop 的 FileUtil
class 有 unTar()
and unZip()
方法来实现这一点。 unTar()
方法也适用于 .tar.gz
和 .tgz
文件。不幸的是,它们只能处理本地文件系统上的文件。您必须使用相同的 class 的 copy()
方法之一来复制您需要使用的任何分布式文件系统。
提供 Scala 代码
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, CompressionInputStream}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.io.IOUtils
val conf = new org.apache.hadoop.conf.Configuration()
def extractFile (sparkSession: SparkSession, compath : String, uncompPath :String): String = {
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
val inputPath = new Path(compath)
val factory = new CompressionCodecFactory(sparkSession.sparkContext.hadoopConfiguration);
val codec = factory.getCodec(inputPath)
if (codec == null){
throw new RuntimeException(s"Not a valid codex $codec")
}
var in : CompressionInputStream = null;
var out : FSDataOutputStream = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(uncompPath));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
uncompPath
}
我在 hdfs 的文件夹中有一堆 .gz 文件。我想将所有这些 .gz 文件解压缩到 hdfs 中的一个新文件夹中。我应该怎么做?
您可以使用配置单元执行此操作(假设它是文本数据)。
create external table source (t str) location '<directory_with_gz_files>';
create external table target (t str) location '<target_dir>';
insert into table target select * from source;
数据将被解压缩到一组新文件中。
如果您不想更改名称并且您所在的节点上有足够的存储空间运行,您可以这样做。
hadoop fs -get <your_source_directory> <directory_name>
It will create a directory where you run hadoop command. cd to it and gunzip all the files
cd ..
hadoop fs -moveFromLocal <directory_name> <target_hdfs_path>
我可以想到通过 3 种不同的方式实现它。
使用Linux命令行
以下命令对我有用。
hadoop fs -cat /tmp/Links.txt.gz | gzip -d | hadoop fs -put - /tmp/unzipped/Links.txt
我的压缩文件是
Links.txt.gz
输出存储在/tmp/unzipped/Links.txt
使用Java程序
在
Hadoop The Definitve Guide
书中,有一节是关于Codecs
的。在该部分中,有一个使用CompressionCodecFactory
解压缩输出的程序。我是re-producing那个代码:package com.myorg.hadooptests; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } }
此代码将gz 文件路径作为输入。
您可以按以下方式执行:FileDecompressor <gzipped file name>
例如当我执行压缩文件时:
FileDecompressor /tmp/Links.txt.gz
我在以下位置获得了解压缩的文件:
/tmp/Links.txt
它将解压缩的文件存储在同一文件夹中。所以需要修改这段代码,取2个输入参数:
<input file path> and <output folder>
.让这个程序运行后,您可以编写一个 Shell/Perl/Python 脚本来为您拥有的每个输入调用这个程序。
使用 Pig 脚本
您可以编写一个简单的 Pig 脚本来实现此目的。
我编写了以下脚本,它有效:
A = LOAD '/tmp/Links.txt.gz' USING PigStorage(); Store A into '/tmp/tmp_unzipped/' USING PigStorage(); mv /tmp/tmp_unzipped/part-m-00000 /tmp/unzipped/Links.txt rm /tmp/tmp_unzipped/
当您运行此脚本时,解压缩的内容存储在一个临时文件夹中:
/tmp/tmp_unzipped
。此文件夹将包含/tmp/tmp_unzipped/_SUCCESS /tmp/tmp_unzipped/part-m-00000
part-m-00000
包含解压缩的文件。因此,我们需要使用以下命令显式重命名它,最后删除
/tmp/tmp_unzipped
文件夹:mv /tmp/tmp_unzipped/part-m-00000 /tmp/unzipped/Links.txt rm /tmp/tmp_unzipped/
因此,如果您使用此 Pig 脚本,则只需注意参数化文件名(Links.txt.gz 和 Links.txt)。
同样,一旦您使此脚本运行,您可以编写一个 Shell/Perl/Python 脚本来为您拥有的每个输入调用此 Pig 脚本。
如果您有压缩的文本文件,hadoop fs -text 支持 gzip 以及其他常见的压缩格式(snappy、lzo)。
hadoop fs -text /tmp/a.gz | hadoop fs -put - /tmp/uncompressed_a
Bash解决方案
就我而言,我不想通过管道解压缩文件,因为我不确定它们的内容。相反,我想确保 zip 文件中的所有文件都将提取到 HDFS 上。
我创建了一个简单的 bash 脚本。评论应该给你一个线索是怎么回事。下面有一个简短的描述。
#!/bin/bash
workdir=/tmp/unziphdfs/
cd $workdir
# get all zip files in a folder
zips=$(hadoop fs -ls /yourpath/*.zip | awk '{print }')
for hdfsfile in $zips
do
echo $hdfsfile
# copy to temp folder to unpack
hdfs dfs -copyToLocal $hdfsfile $workdir
hdfsdir=$(dirname "$hdfsfile")
zipname=$(basename "$hdfsfile")
# unpack locally and remove
unzip $zipname
rm -rf $zipname
# copy files back to hdfs
files=$(ls $workdir)
for file in $files; do
hdfs dfs -copyFromLocal $file $hdfsdir
rm -rf $file
done
# optionally remove the zip file from hdfs?
# hadoop fs -rm -skipTrash $hdfsfile
done
描述
- 获取
hdfs
目录中的所有*.zip
个文件 - 逐一:将
zip
复制到临时目录(在文件系统上) - 解压缩
- 将所有解压缩的文件复制到压缩文件的目录
- 清理
我使用 /mypath/*/*.zip
设法让它与每个 zip 文件中的许多 zip 文件的子目录结构一起工作。
祝你好运:)
Hadoop 的 FileUtil
class 有 unTar()
and unZip()
方法来实现这一点。 unTar()
方法也适用于 .tar.gz
和 .tgz
文件。不幸的是,它们只能处理本地文件系统上的文件。您必须使用相同的 class 的 copy()
方法之一来复制您需要使用的任何分布式文件系统。
提供 Scala 代码
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, CompressionInputStream}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.io.IOUtils
val conf = new org.apache.hadoop.conf.Configuration()
def extractFile (sparkSession: SparkSession, compath : String, uncompPath :String): String = {
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
val inputPath = new Path(compath)
val factory = new CompressionCodecFactory(sparkSession.sparkContext.hadoopConfiguration);
val codec = factory.getCodec(inputPath)
if (codec == null){
throw new RuntimeException(s"Not a valid codex $codec")
}
var in : CompressionInputStream = null;
var out : FSDataOutputStream = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(uncompPath));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
uncompPath
}