获取使用 Spark 加载的文件的详细信息
Get the details of a file loaded with Spark
要在 Spark 中加载文件,我使用这些内置方法:
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(SOURCE_PATH);
或
JavaPairRDD<String, String> miao = jsc.wholeTextFiles(SOURCE_PATH);
我有一个字节或字符串表示我从文件夹中选取的文件,它存储在 PairRDD 的值中。密钥包含文件名。
我怎样才能得到这些文件的详细信息?喜欢
File miao = new File(path);
//this kind of details
String date = miao.getLastModified();
我是否应该将它们重新转换回文件,然后读取,然后将它们制成另一个字节数组?有没有更快的流程?
使用映射转换解析此 RDD。在您的地图函数中调用一个接受字符串(即您的文件名)的函数,并使用此字符串打开和处理文件。所以它只不过是一个映射 RDD 转换,它为这个 RDD 的每一行调用一个函数。
您可以编写自定义输入格式并将该 inputFormatClass 传递给 SparkContext 上的 newApiHadoopFile 方法。此 inputFormat 将使用自定义 RecordReader,自定义 recordReader 将读取 fileContent 以及其他文件相关信息(即作者、修改日期等...)。您需要编写一个自定义的 Writable class 来保存文件信息和记录读取的 fileContent reader.
完整的工作代码如下。此代码使用名为 RichFileInputFormat 的自定义输入格式 class。 RichFileInputFormat 是一个 wholeFileInputFormat,这意味着每个输入文件只有一个拆分。这进一步意味着 rdd 分区的数量将等于输入文件的数量。因此,如果您的输入路径包含 10 个文件,那么不管输入文件的大小如何,生成的 rdd 将包含 10 个分区。
这是从 SparkContext 调用此自定义 inputFormat 以加载文件的方式:-
JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(args[1], RichFileInputFormat.class, Text.class,FileInfoWritable.class, new Configuration());
因此您的 rdd 键将是文件路径,值将是一个包含文件内容和其他文件相关信息的 FileInfoWritable。
下面粘贴了完整的工作代码:-
自定义输入格式class
package nk.Whosebug.spark;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class RichFileInputFormat extends FileInputFormat<Text, FileInfoWritable> {
@Override
public RecordReader<Text, FileInfoWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new RichFileRecordReader();
}
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
- 记录reader
包 nk.Whosebug.spark;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream; import
org.apache.hadoop.fs.FileStatus; import
org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.InputSplit; import
org.apache.hadoop.mapreduce.RecordReader; import
org.apache.hadoop.mapreduce.TaskAttemptContext; import
org.apache.hadoop.mapreduce.lib.input.FileSplit; import
org.apache.spark.deploy.SparkHadoopUtil;
public class RichFileRecordReader extends RecordReader<Text,
FileInfoWritable> { private String author; private String
createdDate; private String owner; private String lastModified;
private String content; private boolean processed;
private Text key; private Path path; private FileSystem fs;
public RichFileRecordReader() {
}
@Override public void initialize(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException
{ // this.recordReader.initialize(split, context); final
FileSplit fileSplit = (FileSplit) split; final Path path =
fileSplit.getPath(); this.fs =
path.getFileSystem(SparkHadoopUtil.get().getConfigurationFromJobContext(context));
final FileStatus stat = this.fs.getFileStatus(path); this.path =
path; this.author = stat.getOwner(); this.createdDate =
String.valueOf(stat.getModificationTime()); this.lastModified =
String.valueOf(stat.getAccessTime()); this.key = new
Text(path.toString()); }
@Override public boolean nextKeyValue() throws IOException,
InterruptedException { // TODO Auto-generated method stub
FSDataInputStream stream = null; try { if (!processed) {
int len = (int) this.fs.getFileStatus(this.path).getLen();
final byte[] data = new byte[len];
stream = this.fs.open(this.path);
int read = stream.read(data);
String content = new String(data, 0, read);
this.content = content;
processed = true;
return true; } } catch (IOException e) { e.printStackTrace(); if (stream != null) {
try {
stream.close();
} catch (IOException ie) {
ie.printStackTrace();
} } } return false; }
@Override public Text getCurrentKey() throws IOException,
InterruptedException { // TODO Auto-generated method stub return
this.key; }
@Override public FileInfoWritable getCurrentValue() throws
IOException, InterruptedException { // TODO Auto-generated method
stub
final FileInfoWritable fileInfo = new FileInfoWritable();
fileInfo.setContent(this.content);
fileInfo.setAuthor(this.author);
fileInfo.setCreatedDate(this.createdDate);
fileInfo.setOwner(this.owner);
fileInfo.setPath(this.path.toString()); return fileInfo; }
@Override public float getProgress() throws IOException,
InterruptedException { // TODO Auto-generated method stub return
processed ? 1.0f : 0.0f; }
@Override public void close() throws IOException { // TODO
Auto-generated method stub
}
}
- 可写class
包 nk.Whosebug.spark;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.hadoop.io.Writable;
import com.google.common.base.Charsets;
public class FileInfoWritable implements Writable {
private final static Charset CHARSET = Charsets.UTF_8;
private String createdDate;
private String owner;
// private String lastModified;
private String content;
private String path;
public FileInfoWritable() {
}
public void readFields(DataInput in) throws IOException {
this.createdDate = readString(in);
this.owner = readString(in);
// this.lastModified = readString(in);
this.content = readString(in);
this.path = readString(in);
}
public void write(DataOutput out) throws IOException {
writeString(createdDate, out);
writeString(owner, out);
// writeString(lastModified, out);
writeString(content, out);
writeString(path, out);
}
private String readString(DataInput in) throws IOException {
final int n = in.readInt();
final byte[] content = new byte[n];
in.readFully(content);
return new String(content, CHARSET);
}
private void writeString(String str, DataOutput out) throws IOException {
out.writeInt(str.length());
out.write(str.getBytes(CHARSET));
}
public String getCreatedDate() {
return createdDate;
}
public void setCreatedDate(String createdDate) {
this.createdDate = createdDate;
}
public String getAuthor() {
return owner;
}
public void setAuthor(String author) {
this.owner = author;
}
/*public String getLastModified() {
return lastModified;
}*/
/*public void setLastModified(String lastModified) {
this.lastModified = lastModified;
}*/
public String getOwner() {
return owner;
}
public void setOwner(String owner) {
this.owner = owner;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
}
- 主要class显示如何使用
包 nk.Whosebug.spark;
import org.apache.hadoop.conf.Configuration; import
org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import
org.apache.spark.api.java.JavaPairRDD; import
org.apache.spark.api.java.JavaSparkContext; import
org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class CustomInputFormat { public static void main(String[]
args) {
SparkConf conf = new SparkConf();
conf.setAppName(args[0]);
conf.setMaster("local[*]");
final String inputPath = args[1];
JavaSparkContext sc = new
JavaSparkContext(conf);
JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(inputPath, RichFileInputFormat.class,
Text.class,
FileInfoWritable.class, new Configuration());
rdd.foreach(new VoidFunction<Tuple2<Text, FileInfoWritable>>() {
public void call(Tuple2<Text, FileInfoWritable> t) throws
Exception {
final Text filePath = t._1();
final String fileContent = t._2().getContent();
System.out.println("file " + filePath + " has contents= " + fileContent); } });
sc.close(); } }
要在 Spark 中加载文件,我使用这些内置方法:
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(SOURCE_PATH);
或
JavaPairRDD<String, String> miao = jsc.wholeTextFiles(SOURCE_PATH);
我有一个字节或字符串表示我从文件夹中选取的文件,它存储在 PairRDD 的值中。密钥包含文件名。
我怎样才能得到这些文件的详细信息?喜欢
File miao = new File(path);
//this kind of details
String date = miao.getLastModified();
我是否应该将它们重新转换回文件,然后读取,然后将它们制成另一个字节数组?有没有更快的流程?
使用映射转换解析此 RDD。在您的地图函数中调用一个接受字符串(即您的文件名)的函数,并使用此字符串打开和处理文件。所以它只不过是一个映射 RDD 转换,它为这个 RDD 的每一行调用一个函数。
您可以编写自定义输入格式并将该 inputFormatClass 传递给 SparkContext 上的 newApiHadoopFile 方法。此 inputFormat 将使用自定义 RecordReader,自定义 recordReader 将读取 fileContent 以及其他文件相关信息(即作者、修改日期等...)。您需要编写一个自定义的 Writable class 来保存文件信息和记录读取的 fileContent reader.
完整的工作代码如下。此代码使用名为 RichFileInputFormat 的自定义输入格式 class。 RichFileInputFormat 是一个 wholeFileInputFormat,这意味着每个输入文件只有一个拆分。这进一步意味着 rdd 分区的数量将等于输入文件的数量。因此,如果您的输入路径包含 10 个文件,那么不管输入文件的大小如何,生成的 rdd 将包含 10 个分区。
这是从 SparkContext 调用此自定义 inputFormat 以加载文件的方式:-
JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(args[1], RichFileInputFormat.class, Text.class,FileInfoWritable.class, new Configuration());
因此您的 rdd 键将是文件路径,值将是一个包含文件内容和其他文件相关信息的 FileInfoWritable。
下面粘贴了完整的工作代码:-
自定义输入格式class
package nk.Whosebug.spark; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class RichFileInputFormat extends FileInputFormat<Text, FileInfoWritable> { @Override public RecordReader<Text, FileInfoWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new RichFileRecordReader(); } protected boolean isSplitable(JobContext context, Path filename) { return false; } }
- 记录reader
包 nk.Whosebug.spark;
import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.spark.deploy.SparkHadoopUtil; public class RichFileRecordReader extends RecordReader<Text, FileInfoWritable> { private String author; private String createdDate; private String owner; private String lastModified; private String content; private boolean processed; private Text key; private Path path; private FileSystem fs; public RichFileRecordReader() { } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // this.recordReader.initialize(split, context); final FileSplit fileSplit = (FileSplit) split; final Path path = fileSplit.getPath(); this.fs = path.getFileSystem(SparkHadoopUtil.get().getConfigurationFromJobContext(context)); final FileStatus stat = this.fs.getFileStatus(path); this.path = path; this.author = stat.getOwner(); this.createdDate = String.valueOf(stat.getModificationTime()); this.lastModified = String.valueOf(stat.getAccessTime()); this.key = new Text(path.toString()); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub FSDataInputStream stream = null; try { if (!processed) { int len = (int) this.fs.getFileStatus(this.path).getLen(); final byte[] data = new byte[len]; stream = this.fs.open(this.path); int read = stream.read(data); String content = new String(data, 0, read); this.content = content; processed = true; return true; } } catch (IOException e) { e.printStackTrace(); if (stream != null) { try { stream.close(); } catch (IOException ie) { ie.printStackTrace(); } } } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.key; } @Override public FileInfoWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub final FileInfoWritable fileInfo = new FileInfoWritable(); fileInfo.setContent(this.content); fileInfo.setAuthor(this.author); fileInfo.setCreatedDate(this.createdDate); fileInfo.setOwner(this.owner); fileInfo.setPath(this.path.toString()); return fileInfo; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { // TODO Auto-generated method stub } }
- 可写class
包 nk.Whosebug.spark;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.charset.Charset; import org.apache.hadoop.io.Writable; import com.google.common.base.Charsets; public class FileInfoWritable implements Writable { private final static Charset CHARSET = Charsets.UTF_8; private String createdDate; private String owner; // private String lastModified; private String content; private String path; public FileInfoWritable() { } public void readFields(DataInput in) throws IOException { this.createdDate = readString(in); this.owner = readString(in); // this.lastModified = readString(in); this.content = readString(in); this.path = readString(in); } public void write(DataOutput out) throws IOException { writeString(createdDate, out); writeString(owner, out); // writeString(lastModified, out); writeString(content, out); writeString(path, out); } private String readString(DataInput in) throws IOException { final int n = in.readInt(); final byte[] content = new byte[n]; in.readFully(content); return new String(content, CHARSET); } private void writeString(String str, DataOutput out) throws IOException { out.writeInt(str.length()); out.write(str.getBytes(CHARSET)); } public String getCreatedDate() { return createdDate; } public void setCreatedDate(String createdDate) { this.createdDate = createdDate; } public String getAuthor() { return owner; } public void setAuthor(String author) { this.owner = author; } /*public String getLastModified() { return lastModified; }*/ /*public void setLastModified(String lastModified) { this.lastModified = lastModified; }*/ public String getOwner() { return owner; } public void setOwner(String owner) { this.owner = owner; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } }
- 主要class显示如何使用
包 nk.Whosebug.spark;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class CustomInputFormat { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(args[0]); conf.setMaster("local[*]"); final String inputPath = args[1]; JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(inputPath, RichFileInputFormat.class, Text.class, FileInfoWritable.class, new Configuration()); rdd.foreach(new VoidFunction<Tuple2<Text, FileInfoWritable>>() { public void call(Tuple2<Text, FileInfoWritable> t) throws Exception { final Text filePath = t._1(); final String fileContent = t._2().getContent(); System.out.println("file " + filePath + " has contents= " + fileContent); } }); sc.close(); } }