Hadoop NTriplesMapper (apache.jena) 无法正常工作,映射输入记录=0
Hadoop NTriplesMapper (apache.jena) not working properly, Map input records=0
我正在研究 PageRank 算法的实现,该算法使用 Hadoop、MapReduce 和 RDF 三元组作为源。
到目前为止,代码非常简单,main class 有一个作业,然后是 mapper 和 reducer。输入文件是一个.nt文件,里面全是rdf三元组,比如:
<http://dbpedia.org/resource/Anarchism> <http://dbpedia.org/ontology/wikiPageWikiLink> <http://dbpedia.org/resource/Red_Army> .
Mapper 应该将这些三元组映射到主题、对象对中。对于给定的 rdf 它将是:
<http://dbpedia.org/resource/Anarchism> <http://dbpedia.org/resource/Red_Army>
Reducer 应该将这些对分组到包含主题、基本 PageRank (1) 和对象列表的行中。例如:
<http://dbpedia.org/resource/Anarchism> 1.0 <http://dbpedia.org/resource/Red_Army>,<http://dbpedia.org/resource/Joseph_Conrad>
我在 windows 上使用 hadoop 2.3.0。显然它配置正确,因为这样的 WordCount 示例可以在其上运行。 (edit) 在 hadoop 2.6.0 上的 linux 下也试过,没有更好的效果,结果是一样的。
我正在使用以下命令执行 jar:
hadoop jar 'C:\hwork\PageRankHadoop.jar' PageRankHadoop /in /output --all
对于大约 1500 行长的输入文件,执行大约需要 1 分钟,但会生成空输出(包括 _SUCCESS sic!)。显然映射器无法正常工作,因为在日志中我可以看到
Map-Reduce Framework
Map input records=0
Map output records=0
Map output bytes=0
今天用这段代码摆弄了 8 个小时,但没有得到一个输出。因此,我正在寻求您的帮助,各位编码员。
我会在代码下方粘贴更多来自作业执行的日志,这可能会有所帮助。我还注意到,在作业执行期间,每当作业 运行 是一个映射器时,hadoop namenode 都会抛出
15/04/27 21:15:59 INFO ipc.Server: Socket Reader #1 for port 9000: readAndProcess from client 127.0.0.1 threw exception [java.io.IOException: An existing connection was forcibly closed by the remote host]
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(Unknown Source)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source)
at sun.nio.ch.IOUtil.read(Unknown Source)
at sun.nio.ch.SocketChannelImpl.read(Unknown Source)
at org.apache.hadoop.ipc.Server.channelRead(Server.java:2502)
at org.apache.hadoop.ipc.Server.access00(Server.java:124)
at org.apache.hadoop.ipc.Server$Connection.readAndProcess(Server.java:1410)
at org.apache.hadoop.ipc.Server$Listener.doRead(Server.java:708)
at org.apache.hadoop.ipc.Server$Listener$Reader.doRunLoop(Server.java:582)
at org.apache.hadoop.ipc.Server$Listener$Reader.run(Server.java:553)
根据一些文章,我发现它不会破坏我的映射器,但它确实对我来说看起来很可疑,我不知道为什么会这样。
主要class:
public class PageRankHadoop {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Job job = new Job(conf, "Page Rank RDF Hadoop");
job.setJarByClass(PageRankHadoop.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(NTriplesMapper.class);
job.setReducerClass(NTriplesReducer.class);
job.setInputFormatClass(NTriplesInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
} catch (IOException | IllegalStateException | IllegalArgumentException | InterruptedException | ClassNotFoundException e) {
System.err.println("Error! " + e.getMessage());
e.printStackTrace(System.err);
}
}
}
Mapper:
public class NTriplesMapper extends Mapper<LongWritable, TripleWritable, LongWritable, Text> {
@Override
protected void map(LongWritable key, TripleWritable value, Context context) {
try {
context.write(key, new Text(value.get().getObject().getURI()));
} catch (IOException | InterruptedException ex) {
System.err.println("Mapper error: " + ex.getMessage());
ex.printStackTrace(System.err);
}
}
}
Reducer:
public class NTriplesReducer extends Reducer<LongWritable, Text, Text, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) {
String pageRankList = "1.0";
for (Text value : values) {
pageRankList += "," + value.toString();
}
try {
context.write(new Text(key.toString()), new Text(pageRankList));
} catch (IOException | InterruptedException ex) {
System.err.println("Reducer error: " + ex.getMessage());
ex.printStackTrace(System.err);
}
}
}
Shell 作业执行日志:
http://pastebin.com/Uf0zH20H
来自 hadoop\logs\userlogs 的系统日志:
http://pastebin.com/gNCWDsr7
编辑,在代码中添加了记录器,没有抛出异常。还尝试在 hadoop 2.6.0 linux 下 运行 此代码,结果与 windows
下的 hadoop 2.3.0 相同
您的代码可能存在很多问题,我将尝试并强调这些问题,但目前尚不清楚是哪一个问题导致了问题。
吞咽错误
好吧,第一个明显的问题是您的代码吞噬了错误:
catch (IOException | IllegalStateException | IllegalArgumentException | InterruptedException | ClassNotFoundException e) {
}
这意味着您的作业抛出的任何错误都会被静默抑制。至少你应该将错误转储到控制台,例如
catch (Throwable e) {
System.err.println(e.getMessage());
e.printStackTrace(System.err);
}
这是我要更改的第一件事,如果您随后开始看到一条错误消息,这将为您指出问题的实际原因。
类型签名
其次,您在 Mapper
和 Reducer
中直接使用了 Triple
类型。 Triple
类型是标准 Java 对象,不能用作 Hadoop Writable
类型。
要在 Hadoop 上使用 RDF 数据,您需要使用 Apache Jena Elephas 库(您似乎至少在部分代码中这样做)和 TripleWritable
类型,所以它是不清楚为什么 Hadoop 甚至允许您的代码 compile/run.
文件读取问题
一个可能的问题是您可能需要明确指定要递归搜索输入路径。根据 this answer 尝试在为您的作业设置输入路径之前添加以下内容:
FileInputFormat.setInputDirRecursive(true);
Hadoop 版本不匹配
您使用的是 Hadoop 2.3.0,而 Elephas 是为 2.6.0 构建的 - 我不认为 Elephas 使用任何不向后兼容的 API,但如果所有其他方法都失败了,您可以尝试自己针对 Hadoop 构建库Using Alternative Hadoop Versions
上文档的版本
Hadoop 版本不匹配,或者说 Jena 版本是问题所在。一个依赖项太旧了,确实没有给出任何提示,但使用最新版本解决了这个问题。
我正在研究 PageRank 算法的实现,该算法使用 Hadoop、MapReduce 和 RDF 三元组作为源。
到目前为止,代码非常简单,main class 有一个作业,然后是 mapper 和 reducer。输入文件是一个.nt文件,里面全是rdf三元组,比如:
<http://dbpedia.org/resource/Anarchism> <http://dbpedia.org/ontology/wikiPageWikiLink> <http://dbpedia.org/resource/Red_Army> .
Mapper 应该将这些三元组映射到主题、对象对中。对于给定的 rdf 它将是:
<http://dbpedia.org/resource/Anarchism> <http://dbpedia.org/resource/Red_Army>
Reducer 应该将这些对分组到包含主题、基本 PageRank (1) 和对象列表的行中。例如:
<http://dbpedia.org/resource/Anarchism> 1.0 <http://dbpedia.org/resource/Red_Army>,<http://dbpedia.org/resource/Joseph_Conrad>
我在 windows 上使用 hadoop 2.3.0。显然它配置正确,因为这样的 WordCount 示例可以在其上运行。 (edit) 在 hadoop 2.6.0 上的 linux 下也试过,没有更好的效果,结果是一样的。
我正在使用以下命令执行 jar:
hadoop jar 'C:\hwork\PageRankHadoop.jar' PageRankHadoop /in /output --all
对于大约 1500 行长的输入文件,执行大约需要 1 分钟,但会生成空输出(包括 _SUCCESS sic!)。显然映射器无法正常工作,因为在日志中我可以看到
Map-Reduce Framework
Map input records=0
Map output records=0
Map output bytes=0
今天用这段代码摆弄了 8 个小时,但没有得到一个输出。因此,我正在寻求您的帮助,各位编码员。
我会在代码下方粘贴更多来自作业执行的日志,这可能会有所帮助。我还注意到,在作业执行期间,每当作业 运行 是一个映射器时,hadoop namenode 都会抛出
15/04/27 21:15:59 INFO ipc.Server: Socket Reader #1 for port 9000: readAndProcess from client 127.0.0.1 threw exception [java.io.IOException: An existing connection was forcibly closed by the remote host]
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(Unknown Source)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source)
at sun.nio.ch.IOUtil.read(Unknown Source)
at sun.nio.ch.SocketChannelImpl.read(Unknown Source)
at org.apache.hadoop.ipc.Server.channelRead(Server.java:2502)
at org.apache.hadoop.ipc.Server.access00(Server.java:124)
at org.apache.hadoop.ipc.Server$Connection.readAndProcess(Server.java:1410)
at org.apache.hadoop.ipc.Server$Listener.doRead(Server.java:708)
at org.apache.hadoop.ipc.Server$Listener$Reader.doRunLoop(Server.java:582)
at org.apache.hadoop.ipc.Server$Listener$Reader.run(Server.java:553)
根据一些文章,我发现它不会破坏我的映射器,但它确实对我来说看起来很可疑,我不知道为什么会这样。
主要class:
public class PageRankHadoop {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Job job = new Job(conf, "Page Rank RDF Hadoop");
job.setJarByClass(PageRankHadoop.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(NTriplesMapper.class);
job.setReducerClass(NTriplesReducer.class);
job.setInputFormatClass(NTriplesInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
} catch (IOException | IllegalStateException | IllegalArgumentException | InterruptedException | ClassNotFoundException e) {
System.err.println("Error! " + e.getMessage());
e.printStackTrace(System.err);
}
}
}
Mapper:
public class NTriplesMapper extends Mapper<LongWritable, TripleWritable, LongWritable, Text> {
@Override
protected void map(LongWritable key, TripleWritable value, Context context) {
try {
context.write(key, new Text(value.get().getObject().getURI()));
} catch (IOException | InterruptedException ex) {
System.err.println("Mapper error: " + ex.getMessage());
ex.printStackTrace(System.err);
}
}
}
Reducer:
public class NTriplesReducer extends Reducer<LongWritable, Text, Text, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) {
String pageRankList = "1.0";
for (Text value : values) {
pageRankList += "," + value.toString();
}
try {
context.write(new Text(key.toString()), new Text(pageRankList));
} catch (IOException | InterruptedException ex) {
System.err.println("Reducer error: " + ex.getMessage());
ex.printStackTrace(System.err);
}
}
}
Shell 作业执行日志: http://pastebin.com/Uf0zH20H 来自 hadoop\logs\userlogs 的系统日志: http://pastebin.com/gNCWDsr7
编辑,在代码中添加了记录器,没有抛出异常。还尝试在 hadoop 2.6.0 linux 下 运行 此代码,结果与 windows
下的 hadoop 2.3.0 相同您的代码可能存在很多问题,我将尝试并强调这些问题,但目前尚不清楚是哪一个问题导致了问题。
吞咽错误
好吧,第一个明显的问题是您的代码吞噬了错误:
catch (IOException | IllegalStateException | IllegalArgumentException | InterruptedException | ClassNotFoundException e) {
}
这意味着您的作业抛出的任何错误都会被静默抑制。至少你应该将错误转储到控制台,例如
catch (Throwable e) {
System.err.println(e.getMessage());
e.printStackTrace(System.err);
}
这是我要更改的第一件事,如果您随后开始看到一条错误消息,这将为您指出问题的实际原因。
类型签名
其次,您在 Mapper
和 Reducer
中直接使用了 Triple
类型。 Triple
类型是标准 Java 对象,不能用作 Hadoop Writable
类型。
要在 Hadoop 上使用 RDF 数据,您需要使用 Apache Jena Elephas 库(您似乎至少在部分代码中这样做)和 TripleWritable
类型,所以它是不清楚为什么 Hadoop 甚至允许您的代码 compile/run.
文件读取问题
一个可能的问题是您可能需要明确指定要递归搜索输入路径。根据 this answer 尝试在为您的作业设置输入路径之前添加以下内容:
FileInputFormat.setInputDirRecursive(true);
Hadoop 版本不匹配
您使用的是 Hadoop 2.3.0,而 Elephas 是为 2.6.0 构建的 - 我不认为 Elephas 使用任何不向后兼容的 API,但如果所有其他方法都失败了,您可以尝试自己针对 Hadoop 构建库Using Alternative Hadoop Versions
上文档的版本Hadoop 版本不匹配,或者说 Jena 版本是问题所在。一个依赖项太旧了,确实没有给出任何提示,但使用最新版本解决了这个问题。