Hadoop 2:使用自定义 InputFormat 时结果为空
Hadoop 2: Empty result when using custom InputFormat
我想使用自己的 FileInputFormat
和自定义 RecordReader
将 csv 数据读入 <Long><String>
对。
因此我创建了 class MyTextInputFormat
:
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class MyTextInputFormat extends FileInputFormat<Long, String> {
@Override
public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(input.toString());
return new MyStringRecordReader(job, (FileSplit)input);
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return super.isSplitable(fs, filename);
}
}
和 class MyStringRecordReader
:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class MyStringRecordReader implements RecordReader<Long, String> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
System.out.println("constructor called");
}
@Override
public void close() throws IOException {
lineReader.close();
}
@Override
public Long createKey() {
return lineKey.get();
}
@Override
public String createValue() {
System.out.println("createValue called");
return lineValue.toString();
}
@Override
public long getPos() throws IOException {
return lineReader.getPos();
}
@Override
public float getProgress() throws IOException {
return lineReader.getProgress();
}
@Override
public boolean next(Long key, String value) throws IOException {
System.out.println("next called");
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key = lineKey.get();
value = lineValue.toString();
System.out.println(key);
System.out.println(value);
return true;
}
}
在我的 Spark 应用程序中,我通过调用 sparkContext.hadoopFile
方法读取文件。但我只从以下代码得到一个空输出:
public class AssociationRulesAnalysis {
@SuppressWarnings("serial")
public static void main(String[] args) {
JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
@Override
public String call(Tuple2<Long, String> arg0) throws Exception {
System.out.println("map: " + arg0._2());
return arg0._2();
}
});
List<String> asList = inputRdd.take(10);
for(String s : asList) {
System.out.println(s);
}
}
}
我只从 RDD 返回 10 行空行。
添加了 prints
的控制台输出如下所示:
=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
Stopping...
(RDD 数据打印在 =====
输出下方(10 行空行!!!)。=====
上方的输出似乎是由 RDD.count
调用生成的。在 next
方法中显示了正确的键和值!?我做错了什么?
lineKey
和 lineValue
永远不会初始化为 key
和 value
传递给 MyStringRecordReader
中覆盖的 next
方法.因此,当您尝试使用 RecordReader
时,它总是显示 EMPTY 字符串。
如果您希望文件中的记录具有不同的键和值,则需要使用传递给 next
方法的键和值,并使用计算出的键和值对其进行初始化。如果您不打算更改 key/value 记录,则删除以下内容。每次执行这段代码时,您都会用 EMPTY 字符串和 0L 覆盖 key/value 从文件中读取的内容。
key = lineKey.get();
value = lineValue.toString();
我想使用自己的 FileInputFormat
和自定义 RecordReader
将 csv 数据读入 <Long><String>
对。
因此我创建了 class MyTextInputFormat
:
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class MyTextInputFormat extends FileInputFormat<Long, String> {
@Override
public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(input.toString());
return new MyStringRecordReader(job, (FileSplit)input);
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return super.isSplitable(fs, filename);
}
}
和 class MyStringRecordReader
:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class MyStringRecordReader implements RecordReader<Long, String> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
System.out.println("constructor called");
}
@Override
public void close() throws IOException {
lineReader.close();
}
@Override
public Long createKey() {
return lineKey.get();
}
@Override
public String createValue() {
System.out.println("createValue called");
return lineValue.toString();
}
@Override
public long getPos() throws IOException {
return lineReader.getPos();
}
@Override
public float getProgress() throws IOException {
return lineReader.getProgress();
}
@Override
public boolean next(Long key, String value) throws IOException {
System.out.println("next called");
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key = lineKey.get();
value = lineValue.toString();
System.out.println(key);
System.out.println(value);
return true;
}
}
在我的 Spark 应用程序中,我通过调用 sparkContext.hadoopFile
方法读取文件。但我只从以下代码得到一个空输出:
public class AssociationRulesAnalysis {
@SuppressWarnings("serial")
public static void main(String[] args) {
JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
@Override
public String call(Tuple2<Long, String> arg0) throws Exception {
System.out.println("map: " + arg0._2());
return arg0._2();
}
});
List<String> asList = inputRdd.take(10);
for(String s : asList) {
System.out.println(s);
}
}
}
我只从 RDD 返回 10 行空行。
添加了 prints
的控制台输出如下所示:
=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
Stopping...
(RDD 数据打印在 =====
输出下方(10 行空行!!!)。=====
上方的输出似乎是由 RDD.count
调用生成的。在 next
方法中显示了正确的键和值!?我做错了什么?
lineKey
和 lineValue
永远不会初始化为 key
和 value
传递给 MyStringRecordReader
中覆盖的 next
方法.因此,当您尝试使用 RecordReader
时,它总是显示 EMPTY 字符串。
如果您希望文件中的记录具有不同的键和值,则需要使用传递给 next
方法的键和值,并使用计算出的键和值对其进行初始化。如果您不打算更改 key/value 记录,则删除以下内容。每次执行这段代码时,您都会用 EMPTY 字符串和 0L 覆盖 key/value 从文件中读取的内容。
key = lineKey.get();
value = lineValue.toString();