使用 Hadoop 以编程方式解压缩包含多个不相关的 csv 文件的文件
Untar files containing multiple unrelated csv files programatically with Hadoop
我的 hdfs 中有几个压缩文件 (.tar.gz),其中包含不相关的 tsv 文件(类似于下面的列表)。我想以编程方式解压这些文件夹,可能利用 MPP 架构(例如 Hadoop 或 Spark)并将它们保存到 hdfs 中。
- browser.tsv
- connection_type.tsv
- country.tsv
- color_depth.tsv
- javascript_version.tsv
- languages.tsv
- operating_systems.tsv
- plugins.tsv
- referrer_type.tsv
- resolution.tsv
- search_engine.tsv
到目前为止,我只能提出一个 bash 脚本,该脚本从 hdfs 下载每个文件,解压缩并将文件夹保存回 hdfs。我什至可以并行化脚本,但我对解决方案也不满意。
谢谢:)
编辑:
如果能看到以下任何一项的解决方案会很有趣:
- Spark 2.4.5
- 蜂巢 2.3.6
- 猪 0.17.0
- Hadoop 2.8.5
我能看到的唯一方法是迭代每个文件并使用 Spark 读取,然后使用 Spark 本身将其写回未压缩的 HDFS。所以像这样(使用 PySpark):
for p in paths
df = spark.read.csv(p, sep=r'\t', header=True)
df.write.csv(p, sep=r'\t', header=True)
注意:我还没有测试过这段代码,在 HDFS 和 tar 文件中复制它很复杂,可能需要添加一些额外的参数解析 tar 个文件,但我希望思路清晰。
恕我直言,不可能在一次迭代中同时读取所有这些文件,因为它们具有不同的结构(以及它们代表的不同数据)。
我终于找到了我的问题的解决方案,它包含一个仅映射器的 Hadoop 作业。每个映射器在 tar 文件夹中获取一个未压缩的文件,并使用 Hadoop 的 MultipleOutput
实用程序将其写入特定路径。
此外,我实现了一个自定义的不可拆分的 Hadoop 输入格式来处理 Tarball 提取,称为 TarballInputFormat
。
public class TarballInputFormat extends FileInputFormat<Text, Text> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) {
TarballRecordReader recordReader = new TarballRecordReader();
recordReader.initialize(inputSplit, taskAttemptContext);
return recordReader;
}
}
TarballRecordReader 处理原始 tarball 文件中所有文件的提取。
public class TarballRecordReader extends RecordReader<Text, Text> {
private static final Log log = LogFactory.getLog(TarballRecordReader.class);
private TarInputStream tarInputStream;
private Text key;
private Text value;
private boolean finished = false;
private String folderName;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
key = new Text();
value = new Text();
try {
FileSplit split = (FileSplit) inputSplit;
Configuration conf = taskAttemptContext.getConfiguration();
Path tarballPath = split.getPath();
folderName = tarballPath.getName().split("\.")[0];
FileSystem fs = tarballPath.getFileSystem(conf);
FSDataInputStream fsInputStream = fs.open(tarballPath);
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
CompressionCodec codec = compressionCodecs.getCodec(tarballPath);
tarInputStream = new TarInputStream(codec.createInputStream(fsInputStream));
}
catch (IOException ex) {
log.error(ex.getMessage());
}
}
@Override
public boolean nextKeyValue() throws IOException {
TarEntry tarEntry = tarInputStream.getNextEntry();
while (tarEntry != null && tarEntry.isDirectory())
tarEntry = tarInputStream.getNextEntry();
finished = tarEntry == null;
if (finished) {
return false;
}
key.clear();
value.clear();
long tarSize = tarEntry.getSize();
int read;
int offset = 0;
int bufSize = (int) tarSize;
byte[] buffer = new byte[bufSize];
while ((read = tarInputStream.read(buffer, offset, bufSize)) != -1) offset += read;
value.set(buffer);
key.set(folderName + "/" + tarEntry.getName());
return true;
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
@Override
public float getProgress() {
return finished? 1: 0;
}
@Override
public void close() throws IOException {
if (tarInputStream != null) {
tarInputStream.close();
}
}
}
每个 tar 球都将被提取,通过将每个文件相对于其父文件夹写入来保持原始结构。在此解决方案中,我们使用映射器同时读取和写入提取的文件。这显然性能较低,但对于那些需要以原始形式(有序输出)保存提取文件的人来说,这可能是一个很好的权衡。另一种方法可以利用 reducer 将每个提取的文件行写入文件系统,这应该以一致性(无序文件内容)为代价增加写入吞吐量。
public class ExtractTarball extends Configured implements Tool {
public static final Log log = LogFactory.getLog(ExtractTarball.class);
private static final String LOOKUP_OUTPUT = "lookup";
public static class MapClass extends Mapper<Text, Text, Text, Text> {
private MultipleOutputs<Text, Text> mos;
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String filename = key.toString();
int length = value.getBytes().length;
System.out.printf("%s: %s%n", filename, length);
mos.write(LOOKUP_OUTPUT, "", value, key.toString());
}
public void setup(Context context) {
mos = new MultipleOutputs<>(context);
}
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "ExtractTarball");
job.setJarByClass(this.getClass());
job.setMapperClass(MapClass.class);
job.setInputFormatClass(TarballInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job, LOOKUP_OUTPUT, TextOutputFormat.class, Text.class, Text.class);
log.isDebugEnabled();
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ExtractTarball(), args);
System.out.println(exitCode);
System.exit(exitCode);
}
}
这是输出文件夹的样子:
- output
- lookup_data
- .browser.tsv-m-00000.crc
- .browser_type.tsv-m-00000.crc
- .color_depth.tsv-m-00000.crc
- .column_headers.tsv-m-00000.crc
- .connection_type.tsv-m-00000.crc
- .country.tsv-m-00000.crc
- .event.tsv-m-00000.crc
- .javascript_version.tsv-m-00000.crc
- .languages.tsv-m-00000.crc
- .operating_systems.tsv-m-00000.crc
- .plugins.tsv-m-00000.crc
- .referrer_type.tsv-m-00000.crc
- .resolution.tsv-m-00000.crc
- .search_engines.tsv-m-00000.crc
- browser.tsv-m-00000
- browser_type.tsv-m-00000
- color_depth.tsv-m-00000
- column_headers.tsv-m-00000
- connection_type.tsv-m-00000
- country.tsv-m-00000
- event.tsv-m-00000
- javascript_version.tsv-m-00000
- languages.tsv-m-00000
- operating_systems.tsv-m-00000
- plugins.tsv-m-00000
- referrer_type.tsv-m-00000
- resolution.tsv-m-00000
- search_engines.tsv-m-00000
- ._SUCCESS.crc
- .part-m-00000.crc
- _SUCCESS
- part-m-00000
我的 hdfs 中有几个压缩文件 (.tar.gz),其中包含不相关的 tsv 文件(类似于下面的列表)。我想以编程方式解压这些文件夹,可能利用 MPP 架构(例如 Hadoop 或 Spark)并将它们保存到 hdfs 中。
- browser.tsv
- connection_type.tsv
- country.tsv
- color_depth.tsv
- javascript_version.tsv
- languages.tsv
- operating_systems.tsv
- plugins.tsv
- referrer_type.tsv
- resolution.tsv
- search_engine.tsv
到目前为止,我只能提出一个 bash 脚本,该脚本从 hdfs 下载每个文件,解压缩并将文件夹保存回 hdfs。我什至可以并行化脚本,但我对解决方案也不满意。
谢谢:)
编辑:
如果能看到以下任何一项的解决方案会很有趣:
- Spark 2.4.5
- 蜂巢 2.3.6
- 猪 0.17.0
- Hadoop 2.8.5
我能看到的唯一方法是迭代每个文件并使用 Spark 读取,然后使用 Spark 本身将其写回未压缩的 HDFS。所以像这样(使用 PySpark):
for p in paths
df = spark.read.csv(p, sep=r'\t', header=True)
df.write.csv(p, sep=r'\t', header=True)
注意:我还没有测试过这段代码,在 HDFS 和 tar 文件中复制它很复杂,可能需要添加一些额外的参数解析 tar 个文件,但我希望思路清晰。
恕我直言,不可能在一次迭代中同时读取所有这些文件,因为它们具有不同的结构(以及它们代表的不同数据)。
我终于找到了我的问题的解决方案,它包含一个仅映射器的 Hadoop 作业。每个映射器在 tar 文件夹中获取一个未压缩的文件,并使用 Hadoop 的 MultipleOutput
实用程序将其写入特定路径。
此外,我实现了一个自定义的不可拆分的 Hadoop 输入格式来处理 Tarball 提取,称为 TarballInputFormat
。
public class TarballInputFormat extends FileInputFormat<Text, Text> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) {
TarballRecordReader recordReader = new TarballRecordReader();
recordReader.initialize(inputSplit, taskAttemptContext);
return recordReader;
}
}
TarballRecordReader 处理原始 tarball 文件中所有文件的提取。
public class TarballRecordReader extends RecordReader<Text, Text> {
private static final Log log = LogFactory.getLog(TarballRecordReader.class);
private TarInputStream tarInputStream;
private Text key;
private Text value;
private boolean finished = false;
private String folderName;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
key = new Text();
value = new Text();
try {
FileSplit split = (FileSplit) inputSplit;
Configuration conf = taskAttemptContext.getConfiguration();
Path tarballPath = split.getPath();
folderName = tarballPath.getName().split("\.")[0];
FileSystem fs = tarballPath.getFileSystem(conf);
FSDataInputStream fsInputStream = fs.open(tarballPath);
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
CompressionCodec codec = compressionCodecs.getCodec(tarballPath);
tarInputStream = new TarInputStream(codec.createInputStream(fsInputStream));
}
catch (IOException ex) {
log.error(ex.getMessage());
}
}
@Override
public boolean nextKeyValue() throws IOException {
TarEntry tarEntry = tarInputStream.getNextEntry();
while (tarEntry != null && tarEntry.isDirectory())
tarEntry = tarInputStream.getNextEntry();
finished = tarEntry == null;
if (finished) {
return false;
}
key.clear();
value.clear();
long tarSize = tarEntry.getSize();
int read;
int offset = 0;
int bufSize = (int) tarSize;
byte[] buffer = new byte[bufSize];
while ((read = tarInputStream.read(buffer, offset, bufSize)) != -1) offset += read;
value.set(buffer);
key.set(folderName + "/" + tarEntry.getName());
return true;
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
@Override
public float getProgress() {
return finished? 1: 0;
}
@Override
public void close() throws IOException {
if (tarInputStream != null) {
tarInputStream.close();
}
}
}
每个 tar 球都将被提取,通过将每个文件相对于其父文件夹写入来保持原始结构。在此解决方案中,我们使用映射器同时读取和写入提取的文件。这显然性能较低,但对于那些需要以原始形式(有序输出)保存提取文件的人来说,这可能是一个很好的权衡。另一种方法可以利用 reducer 将每个提取的文件行写入文件系统,这应该以一致性(无序文件内容)为代价增加写入吞吐量。
public class ExtractTarball extends Configured implements Tool {
public static final Log log = LogFactory.getLog(ExtractTarball.class);
private static final String LOOKUP_OUTPUT = "lookup";
public static class MapClass extends Mapper<Text, Text, Text, Text> {
private MultipleOutputs<Text, Text> mos;
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String filename = key.toString();
int length = value.getBytes().length;
System.out.printf("%s: %s%n", filename, length);
mos.write(LOOKUP_OUTPUT, "", value, key.toString());
}
public void setup(Context context) {
mos = new MultipleOutputs<>(context);
}
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "ExtractTarball");
job.setJarByClass(this.getClass());
job.setMapperClass(MapClass.class);
job.setInputFormatClass(TarballInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job, LOOKUP_OUTPUT, TextOutputFormat.class, Text.class, Text.class);
log.isDebugEnabled();
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ExtractTarball(), args);
System.out.println(exitCode);
System.exit(exitCode);
}
}
这是输出文件夹的样子:
- output
- lookup_data
- .browser.tsv-m-00000.crc
- .browser_type.tsv-m-00000.crc
- .color_depth.tsv-m-00000.crc
- .column_headers.tsv-m-00000.crc
- .connection_type.tsv-m-00000.crc
- .country.tsv-m-00000.crc
- .event.tsv-m-00000.crc
- .javascript_version.tsv-m-00000.crc
- .languages.tsv-m-00000.crc
- .operating_systems.tsv-m-00000.crc
- .plugins.tsv-m-00000.crc
- .referrer_type.tsv-m-00000.crc
- .resolution.tsv-m-00000.crc
- .search_engines.tsv-m-00000.crc
- browser.tsv-m-00000
- browser_type.tsv-m-00000
- color_depth.tsv-m-00000
- column_headers.tsv-m-00000
- connection_type.tsv-m-00000
- country.tsv-m-00000
- event.tsv-m-00000
- javascript_version.tsv-m-00000
- languages.tsv-m-00000
- operating_systems.tsv-m-00000
- plugins.tsv-m-00000
- referrer_type.tsv-m-00000
- resolution.tsv-m-00000
- search_engines.tsv-m-00000
- ._SUCCESS.crc
- .part-m-00000.crc
- _SUCCESS
- part-m-00000