SequenceFile 将多个小文件压缩到一个 file.seq
SequenceFile Compactor of several small files in only one file.seq
HDFS 和 Hadoop 中的 Novell:
我正在开发一个程序,该程序应该获取特定目录的所有文件,我们可以在其中找到任何类型的几个小文件。
获取每个文件并追加到一个压缩的SequenceFile中,其中key必须是文件的路径,value必须是获取到的文件,目前我的代码是:
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.BZip2Codec;
public class Compact {
public static void main (String [] args) throws Exception{
try{
Configuration conf = new Configuration();
FileSystem fs =
FileSystem.get(new URI("hdfs://quickstart.cloudera:8020"),conf);
Path destino = new Path("/user/cloudera/data/testPractice.seq");//test args[1]
if ((fs.exists(destino))){
System.out.println("exist : " + destino);
return;
}
BZip2Codec codec=new BZip2Codec();
SequenceFile.Writer outSeq = SequenceFile.createWriter(conf
,SequenceFile.Writer.file(fs.makeQualified(destino))
,SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK,codec)
,SequenceFile.Writer.keyClass(Text.class)
,SequenceFile.Writer.valueClass(FSDataInputStream.class));
FileStatus[] status = fs.globStatus(new Path("/user/cloudera/data/*.txt"));//args[0]
for (int i=0;i<status.length;i++){
FSDataInputStream in = fs.open(status[i].getPath());
outSeq.append(new org.apache.hadoop.io.Text(status[i].getPath().toString()), new FSDataInputStream(in));
fs.close();
}
outSeq.close();
System.out.println("End Program");
}catch(Exception e){
System.out.println(e.toString());
System.out.println("File not found");
}
}
}
但是每次执行后我都会收到这个异常:
java.io.IOException: Could not find a serializer for the Value class: 'org.apache.hadoop.fs.FSDataInputStream'. Please ensure that the configuration 'io.serializations' is properly configured, if you're using custom serialization.
File not found
我知道错误一定出在我正在创建的文件类型和我为添加到 sequenceFile 中定义的对象类型,但我不知道应该添加哪一个,谁能帮我吗?
FSDataInputStream 与任何其他 InputStream 一样,不打算进行序列化。在字节流上序列化 "iterator" 应该做什么?
您最有可能想要做的是将文件的 内容 存储为值。例如,您可以将值类型从 FsDataInputStream 切换为 BytesWritable,然后从 FSDataInputStream 中获取所有字节。为此目的使用 Key/Value SequenceFile 的一个缺点是每个文件的内容都必须适合内存。对于小文件可能没问题,但你必须意识到这个问题。
我不确定您真正想要实现的目标是什么,但也许您可以通过使用 Hadoop Archives?
之类的东西来避免重新发明轮子
非常感谢你的评论,问题是你说的序列化程序,最后我使用了 BytesWritable:
FileStatus[] status = fs.globStatus(new Path("/user/cloudera/data/*.txt"));//args[0]
for (int i=0;i<status.length;i++){
FSDataInputStream in = fs.open(status[i].getPath());
byte[] content = new byte [(int)fs.getFileStatus(status[i].getPath()).getLen()];
outSeq.append(new org.apache.hadoop.io.Text(status[i].getPath().toString()), new org.apache.hadoop.io.BytesWritable(in));
}
outSeq.close();
在 hadoop 生态系统中可能还有其他更好的解决方案,但这个问题是我正在开发的一个学位的练习,现在我们正在重新造轮子以理解概念 ;-)。
HDFS 和 Hadoop 中的 Novell: 我正在开发一个程序,该程序应该获取特定目录的所有文件,我们可以在其中找到任何类型的几个小文件。
获取每个文件并追加到一个压缩的SequenceFile中,其中key必须是文件的路径,value必须是获取到的文件,目前我的代码是:
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.BZip2Codec;
public class Compact {
public static void main (String [] args) throws Exception{
try{
Configuration conf = new Configuration();
FileSystem fs =
FileSystem.get(new URI("hdfs://quickstart.cloudera:8020"),conf);
Path destino = new Path("/user/cloudera/data/testPractice.seq");//test args[1]
if ((fs.exists(destino))){
System.out.println("exist : " + destino);
return;
}
BZip2Codec codec=new BZip2Codec();
SequenceFile.Writer outSeq = SequenceFile.createWriter(conf
,SequenceFile.Writer.file(fs.makeQualified(destino))
,SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK,codec)
,SequenceFile.Writer.keyClass(Text.class)
,SequenceFile.Writer.valueClass(FSDataInputStream.class));
FileStatus[] status = fs.globStatus(new Path("/user/cloudera/data/*.txt"));//args[0]
for (int i=0;i<status.length;i++){
FSDataInputStream in = fs.open(status[i].getPath());
outSeq.append(new org.apache.hadoop.io.Text(status[i].getPath().toString()), new FSDataInputStream(in));
fs.close();
}
outSeq.close();
System.out.println("End Program");
}catch(Exception e){
System.out.println(e.toString());
System.out.println("File not found");
}
}
}
但是每次执行后我都会收到这个异常:
java.io.IOException: Could not find a serializer for the Value class: 'org.apache.hadoop.fs.FSDataInputStream'. Please ensure that the configuration 'io.serializations' is properly configured, if you're using custom serialization. File not found
我知道错误一定出在我正在创建的文件类型和我为添加到 sequenceFile 中定义的对象类型,但我不知道应该添加哪一个,谁能帮我吗?
FSDataInputStream 与任何其他 InputStream 一样,不打算进行序列化。在字节流上序列化 "iterator" 应该做什么?
您最有可能想要做的是将文件的 内容 存储为值。例如,您可以将值类型从 FsDataInputStream 切换为 BytesWritable,然后从 FSDataInputStream 中获取所有字节。为此目的使用 Key/Value SequenceFile 的一个缺点是每个文件的内容都必须适合内存。对于小文件可能没问题,但你必须意识到这个问题。
我不确定您真正想要实现的目标是什么,但也许您可以通过使用 Hadoop Archives?
之类的东西来避免重新发明轮子非常感谢你的评论,问题是你说的序列化程序,最后我使用了 BytesWritable:
FileStatus[] status = fs.globStatus(new Path("/user/cloudera/data/*.txt"));//args[0]
for (int i=0;i<status.length;i++){
FSDataInputStream in = fs.open(status[i].getPath());
byte[] content = new byte [(int)fs.getFileStatus(status[i].getPath()).getLen()];
outSeq.append(new org.apache.hadoop.io.Text(status[i].getPath().toString()), new org.apache.hadoop.io.BytesWritable(in));
}
outSeq.close();
在 hadoop 生态系统中可能还有其他更好的解决方案,但这个问题是我正在开发的一个学位的练习,现在我们正在重新造轮子以理解概念 ;-)。