Spark - 以字节格式读取文件作为 InputStream
Spark - Read file in byte format as InputStream
如何读取字节数组格式的多个文件作为 spark 作业中的输入流?
Path pt = new Path(umfPaths);
FileSystem fs = FileSystem.get(jsc.hadoopConfiguration());
fs.open(pt);
.. 原因是我有内容为字节格式的输入文件。然后将输入文件拆分为块长度大小为 64 MB 的多个文件并存储在 HDFS 中。我必须使用 Apache spark 并行处理文件。请求是将整个 64MB 的块作为单个文件读取并处理它。通过编写自定义记录 reader 或使用文件系统 API(使用 InputStream)来读取每个文件是否有效?
我通过在 SparkContext 和 newHadoopApiFile 中使用 API 解决了这个问题。
我写了一个 CustomInputFormat class 它将做 InputFormat 的事情并将 return 一个 POJO 对象。
JavaPairRDD> baseRDD =
sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
ArrayList.class, conf);
然后忽略 Key,只创建一个 Values 的 RDD。
JavaRDD> mapLines1 = baseRDD.values();
然后对上面的RDD做了一个FlatMap。
在 InputFormat 内部 class 我扩展了 FileInputFormat 并将 isSplittable 覆盖为 false 以作为单个文件读取。
public class InputFormat extends FileInputFormat {
public
RecordReader<NullWritable, ArrayList<Record>>
createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException{ //Logic Goes here }
@Override
protected boolean isSplitable(JobContext context, Path file) { return false;
}
}
如何读取字节数组格式的多个文件作为 spark 作业中的输入流?
Path pt = new Path(umfPaths);
FileSystem fs = FileSystem.get(jsc.hadoopConfiguration());
fs.open(pt);
.. 原因是我有内容为字节格式的输入文件。然后将输入文件拆分为块长度大小为 64 MB 的多个文件并存储在 HDFS 中。我必须使用 Apache spark 并行处理文件。请求是将整个 64MB 的块作为单个文件读取并处理它。通过编写自定义记录 reader 或使用文件系统 API(使用 InputStream)来读取每个文件是否有效?
我通过在 SparkContext 和 newHadoopApiFile 中使用 API 解决了这个问题。 我写了一个 CustomInputFormat class 它将做 InputFormat 的事情并将 return 一个 POJO 对象。
JavaPairRDD> baseRDD = sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class, ArrayList.class, conf);
然后忽略 Key,只创建一个 Values 的 RDD。
JavaRDD> mapLines1 = baseRDD.values();
然后对上面的RDD做了一个FlatMap。
在 InputFormat 内部 class 我扩展了 FileInputFormat 并将 isSplittable 覆盖为 false 以作为单个文件读取。
public class InputFormat extends FileInputFormat { public RecordReader<NullWritable, ArrayList<Record>> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{ //Logic Goes here } @Override protected boolean isSplitable(JobContext context, Path file) { return false; } }