org.apache.avro.AvroTypeException:预期记录开始。得到 VALUE_STRING
org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
我正在做简单的 json 到 Avro 记录的转换,但我遇到了这个问题,我尝试了很多方法,我应用了来自 Whosebug 和在线的超过 15 个解决方案。
我的文件看起来像这样
{
"namespace": "test",
"type": "record",
"name": "root",
"doc": "This stream contains raw data.",
"fields": [
{
"name": "aaa",
"doc": "You should not edit this portion.",
"type": {
"type": "record",
"name": "EnterpriseEventEnvelopeRecord",
"fields": [
{
"name": "eventId",
"type": "string",
"default": "",
"doc": "Unique Identifier."
},
{
"name": "eventAction",
"type": [
"null",
{
"type": "enum",
"name": "actionTypes",
"symbols": [
"Updated",
"Created",
"Requested",
"Deleted",
"Verified",
"Received",
"Completed",
"Failed",
"Abandoned"
]
}
],
"default": null,
"doc": "A verb indicating what happened."
}
]
}
}
]
}
我的输入json:
{"aaa" : {"eventId" : "omar", "eventAction" : "Requested"}}
我的Class:
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import java.io.*;
import static java.nio.file.Files.readAllBytes;
import static java.nio.file.Paths.get;
public class FeedbackEvent {
public static void main(String args[]) throws Exception{
String jsonFile = "d:/aaa.txt";
String scemaFile = "d:/aaa.avsc";
Schema schema = new Schema.Parser().parse(new File(scemaFile));
String json = new String(readAllBytes(get(jsonFile)));
jsonToAvro(json,schema);
System.out.println("Done....");
}
public static byte[] jsonToAvro(String json, Schema schema) throws IOException {
InputStream input = null;
DataFileWriter<GenericRecord> writer = null;
Encoder encoder = null;
ByteArrayOutputStream output = null;
try {
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
input = new ByteArrayInputStream(json.getBytes());
output = new ByteArrayOutputStream();
DataInputStream din = new DataInputStream(input);
writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>());
writer.create(schema, output);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
GenericRecord datum;
while (true) {
try {
datum = reader.read(null, decoder);
} catch (EOFException eofe) {
break;
}
writer.append(datum);
}
writer.flush();
System.out.println(output);
return output.toByteArray();
} finally {
try { input.close(); } catch (Exception e) { }
}
}
}
如果不为空,则应指定要使用的联合分支。请参考Unions and Json Encoding
在您的情况下,actionTypes。所以 json 应该看起来像
{
"aaa": {
"eventId": "omar",
"eventAction": {
"test.actionTypes": "Requested"
}
}
}
您可能注意到我们还使用了 namespace 以及 union 分支; 线程中提供了清晰的解释。
希望对您有所帮助。
我正在做简单的 json 到 Avro 记录的转换,但我遇到了这个问题,我尝试了很多方法,我应用了来自 Whosebug 和在线的超过 15 个解决方案。
我的文件看起来像这样
{
"namespace": "test",
"type": "record",
"name": "root",
"doc": "This stream contains raw data.",
"fields": [
{
"name": "aaa",
"doc": "You should not edit this portion.",
"type": {
"type": "record",
"name": "EnterpriseEventEnvelopeRecord",
"fields": [
{
"name": "eventId",
"type": "string",
"default": "",
"doc": "Unique Identifier."
},
{
"name": "eventAction",
"type": [
"null",
{
"type": "enum",
"name": "actionTypes",
"symbols": [
"Updated",
"Created",
"Requested",
"Deleted",
"Verified",
"Received",
"Completed",
"Failed",
"Abandoned"
]
}
],
"default": null,
"doc": "A verb indicating what happened."
}
]
}
}
]
}
我的输入json:
{"aaa" : {"eventId" : "omar", "eventAction" : "Requested"}}
我的Class:
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import java.io.*;
import static java.nio.file.Files.readAllBytes;
import static java.nio.file.Paths.get;
public class FeedbackEvent {
public static void main(String args[]) throws Exception{
String jsonFile = "d:/aaa.txt";
String scemaFile = "d:/aaa.avsc";
Schema schema = new Schema.Parser().parse(new File(scemaFile));
String json = new String(readAllBytes(get(jsonFile)));
jsonToAvro(json,schema);
System.out.println("Done....");
}
public static byte[] jsonToAvro(String json, Schema schema) throws IOException {
InputStream input = null;
DataFileWriter<GenericRecord> writer = null;
Encoder encoder = null;
ByteArrayOutputStream output = null;
try {
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
input = new ByteArrayInputStream(json.getBytes());
output = new ByteArrayOutputStream();
DataInputStream din = new DataInputStream(input);
writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>());
writer.create(schema, output);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
GenericRecord datum;
while (true) {
try {
datum = reader.read(null, decoder);
} catch (EOFException eofe) {
break;
}
writer.append(datum);
}
writer.flush();
System.out.println(output);
return output.toByteArray();
} finally {
try { input.close(); } catch (Exception e) { }
}
}
}
如果不为空,则应指定要使用的联合分支。请参考Unions and Json Encoding
在您的情况下,actionTypes。所以 json 应该看起来像
{
"aaa": {
"eventId": "omar",
"eventAction": {
"test.actionTypes": "Requested"
}
}
}
您可能注意到我们还使用了 namespace 以及 union 分支;
希望对您有所帮助。