将数据写入 Elasticsearch:EsHadoopSerializationException
Writing data to Elasticsearch:EsHadoopSerializationException
我正在使用 ELasticsearch 5.4 和 Hadoop 2.7.3,想将数据从 HDFS 写入 Elasticsearch.My blog.json 中的数据:
{"id":"1","title":"git简介","posttime":"2016-06-11","content":"svn与git的最主要区别..."}
{"id":"2","title":"ava中泛型的介绍与简单使用","posttime":"2016-06-12","content":"基本操作:CRUD ..."}
{"id":"3","title":"SQL基本操作","posttime":"2016-06-13","content":"svn与git的最主要区别..."}
{"id":"4","title":"Hibernate框架基础","posttime":"2016-06-14","content":"Hibernate框架基础..."}
{"id":"5","title":"Shell基本知识","posttime":"2016-06-15","content":"Shell是什么..."}
我把blog.json放到HDFS:
hadoop fs -put blog.json /work
然后我启动 Elasticsearch 5.4 并编写我的 java 代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import java.io.IOException;
/**
* Created by bee on 4/1/17.
*/
public class HdfsToES {
public static class MyMapper extends Mapper<Object, Text, NullWritable,
BytesWritable> {
public void map(Object key, Text value, Mapper<Object, Text,
NullWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
byte[] line = value.toString().trim().getBytes();
BytesWritable blog = new BytesWritable(line);
context.write(NullWritable.get(), blog);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("es.nodes", "localhost:9200/");
conf.set("es.resource", "blog/csdn");
conf.set("es.mapping.id", "id");
conf.set("es.input.json", "yes");
Job job = Job.getInstance(conf, "hadoop es write test");
job.setMapperClass(HdfsToES.MyMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
// 设置输入路径
FileInputFormat.setInputPaths(job, new Path
("hdfs://localhost:9000//work/blog.json"));
job.waitForCompletion(true);
}
}
我在 Elasticsearch 中得到一个没有任何数据的空索引,出现流动异常:
java.lang.Exception: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: org.codehaus.jackson.JsonParseException: Unexpected character ('b' (code 98)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: [B@5cdff749; line: 1, column: 3]
它适用于 Elasticsearch 2.3 但不适用于 5。4.How 更新我的代码?
public static class MyMapper extends Mapper<Object, Text, NullWritable,
BytesWritable> {
public void map(Object key, Text value, Mapper<Object, Text,
NullWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
byte[] line = value.toString().trim().getBytes();
BytesWritable blog = new BytesWritable(line);
context.write(NullWritable.get(), blog);
}
}
public static class someMapper extends Mapper<Object, Text, NullWritable, Text>
job.setMapOutputValueClass(Text.class);
不要为 mapvalue
使用 byteswritable
,只需使用 text
。它适用于我的 elasticsearch6.1.1,hadoop 2.7.3
我正在使用 ELasticsearch 5.4 和 Hadoop 2.7.3,想将数据从 HDFS 写入 Elasticsearch.My blog.json 中的数据:
{"id":"1","title":"git简介","posttime":"2016-06-11","content":"svn与git的最主要区别..."}
{"id":"2","title":"ava中泛型的介绍与简单使用","posttime":"2016-06-12","content":"基本操作:CRUD ..."}
{"id":"3","title":"SQL基本操作","posttime":"2016-06-13","content":"svn与git的最主要区别..."}
{"id":"4","title":"Hibernate框架基础","posttime":"2016-06-14","content":"Hibernate框架基础..."}
{"id":"5","title":"Shell基本知识","posttime":"2016-06-15","content":"Shell是什么..."}
我把blog.json放到HDFS:
hadoop fs -put blog.json /work
然后我启动 Elasticsearch 5.4 并编写我的 java 代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import java.io.IOException;
/**
* Created by bee on 4/1/17.
*/
public class HdfsToES {
public static class MyMapper extends Mapper<Object, Text, NullWritable,
BytesWritable> {
public void map(Object key, Text value, Mapper<Object, Text,
NullWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
byte[] line = value.toString().trim().getBytes();
BytesWritable blog = new BytesWritable(line);
context.write(NullWritable.get(), blog);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("es.nodes", "localhost:9200/");
conf.set("es.resource", "blog/csdn");
conf.set("es.mapping.id", "id");
conf.set("es.input.json", "yes");
Job job = Job.getInstance(conf, "hadoop es write test");
job.setMapperClass(HdfsToES.MyMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
// 设置输入路径
FileInputFormat.setInputPaths(job, new Path
("hdfs://localhost:9000//work/blog.json"));
job.waitForCompletion(true);
}
}
我在 Elasticsearch 中得到一个没有任何数据的空索引,出现流动异常:
java.lang.Exception: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: org.codehaus.jackson.JsonParseException: Unexpected character ('b' (code 98)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: [B@5cdff749; line: 1, column: 3]
它适用于 Elasticsearch 2.3 但不适用于 5。4.How 更新我的代码?
public static class MyMapper extends Mapper<Object, Text, NullWritable,
BytesWritable> {
public void map(Object key, Text value, Mapper<Object, Text,
NullWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
byte[] line = value.toString().trim().getBytes();
BytesWritable blog = new BytesWritable(line);
context.write(NullWritable.get(), blog);
}
}
public static class someMapper extends Mapper<Object, Text, NullWritable, Text>
job.setMapOutputValueClass(Text.class);
不要为 mapvalue
使用 byteswritable
,只需使用 text
。它适用于我的 elasticsearch6.1.1,hadoop 2.7.3