org.apache.avro.AvroTypeException:预期记录开始。得到 VALUE_STRING

org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING

我正在做简单的 json 到 Avro 记录的转换,但我遇到了这个问题,我尝试了很多方法,我应用了来自 Whosebug 和在线的超过 15 个解决方案。

我的文件看起来像这样

{
    "namespace": "test",
    "type": "record",
    "name": "root",
    "doc": "This stream contains raw data.",
    "fields": [
        {
            "name": "aaa",
            "doc": "You should not edit this portion.",
            "type": {
                "type": "record",
                "name": "EnterpriseEventEnvelopeRecord",
                "fields": [
                    {
                        "name": "eventId",
                        "type": "string",
                        "default": "",
                        "doc": "Unique Identifier."
                    },
                    {
                        "name": "eventAction",
                        "type": [
                            "null",
                            {
                                "type": "enum",
                                "name": "actionTypes",
                                "symbols": [
                                    "Updated",
                                    "Created",
                                    "Requested",
                                    "Deleted",
                                    "Verified",
                                    "Received",
                                    "Completed",
                                    "Failed",
                                    "Abandoned"
                                ]
                            }
                        ],
                        "default": null,
                        "doc": "A verb indicating what happened."
                    }
                ]
            }
        }
    ]
}

我的输入json:

{"aaa" : {"eventId" : "omar", "eventAction" : "Requested"}}

我的Class:

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;

import java.io.*;

import static java.nio.file.Files.readAllBytes;
import static java.nio.file.Paths.get;

public class FeedbackEvent {
    public static void main(String args[]) throws Exception{
        String jsonFile = "d:/aaa.txt";
        String scemaFile = "d:/aaa.avsc";
        Schema schema = new Schema.Parser().parse(new File(scemaFile));
        String json = new String(readAllBytes(get(jsonFile)));
        jsonToAvro(json,schema);
        System.out.println("Done....");
    }

    public static byte[] jsonToAvro(String json, Schema schema) throws IOException {
        InputStream input = null;
        DataFileWriter<GenericRecord> writer = null;
        Encoder encoder = null;
        ByteArrayOutputStream output = null;
        try {
            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
            input = new ByteArrayInputStream(json.getBytes());
            output = new ByteArrayOutputStream();
            DataInputStream din = new DataInputStream(input);
            writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>());
            writer.create(schema, output);
            Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
            GenericRecord datum;
            while (true) {
                try {
                    datum = reader.read(null, decoder);
                } catch (EOFException eofe) {
                    break;
                }
                writer.append(datum);
            }
            writer.flush();
            System.out.println(output);
            return output.toByteArray();
        } finally {
            try { input.close(); } catch (Exception e) { }
        }
    }
}

如果不为空,则应指定要使用的联合分支。请参考Unions and Json Encoding

在您的情况下,actionTypes。所以 json 应该看起来像

{
  "aaa": {
    "eventId": "omar",
    "eventAction": {
        "test.actionTypes": "Requested"
    }
  }
}

您可能注意到我们还使用了 namespace 以及 union 分支; 线程中提供了清晰的解释。

希望对您有所帮助。