如何使用 Java 在 Apache Beam 中将 JSON 转换为 Parquet

How to convert JSON to Parquet in Apache Beam using Java

我正在尝试转换 Json 数据

{"col1":"sample-val-1", "col2":1.0}
{"col1":"sample-val-2", "col2":2.0}
{"col1":"sample-val-3", "col2":3.0}
{"col1":"sample-val-4", "col2":4.0}
{"col1":"sample-val-5", "col2":5.0}

我需要将其转换为 Parquet

然后我在 Apache Beam 中写了一些代码

package org.apache.beam.examples;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.kitesdk.data.spi.JsonUtil;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;

public class Main {

    public static void main(String[] args) throws IOException {

        Pipeline pipeLine = Pipeline.create();
        PCollection<String> lines = pipeLine.apply("ReadMyFile", TextIO.read().from("path-to-file"));

        File initialFile = new File("path-to-file");
        InputStream targetStream = Files.newInputStream(initialFile.toPath());
        Schema jsonSchema = JsonUtil.inferSchema(targetStream, "RecordName", 20);
        System.out.println(jsonSchema.getDoc());
        PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                Gson gson = new GsonBuilder().create();
                JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
//                out.output(parsedMap);
//                System.out.println(Arrays.toString(parsedMap.toString().getBytes(StandardCharsets.UTF_8)));
                JsonAvroConverter avroConverter = new JsonAvroConverter();
//                GenericRecord record =  avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);

//                context.output(record);
            }
        }));
        pipeLine.run();
        //
//        pgr.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to("path/to/save"));
        
    }
}

我能够逐行获取 json 但无法将其转换为 Parquet 上面的代码抛出错误 如果您尝试使用

将 Json 转换为 Parquet
GenericRecord record =  avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);

这条线导致的错误

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    ... 25 more

我创建了一个新的 class 并添加为参数构造并在 @Setup 中创建它。

Schema jsonSchema = new Schema.Parser().parse(schemaString);
pipeLine.apply("ReadMyFile", TextIO.read().from(options.getInput()))
                .apply("Convert Json To General Record", ParDo.of(new JsonToGeneralRecord(jsonSchema)))
                .setCoder(AvroCoder.of(GenericRecord.class, jsonSchema))

    private static final Logger logger = LogManager.getLogger(JsonToGeneralRecord.class);

    private final String schemaString;
    private Schema jsonSchema;


    // constructor
    JsonToGeneralRecord(Schema schema) {
        schemaString = schema.toString();
    }

    @Setup
    public void setup() {
        jsonSchema = new Schema.Parser().parse(schemaString);
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        Gson gson = new GsonBuilder().create();
        JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
        logger.info("successful: " + parsedMap.toString());

        JsonAvroConverter avroConverter = new JsonAvroConverter();
        try {
            GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
            c.output(record);
        } catch (Exception e) {
            logger.error("error:  " + e.getMessage() + parsedMap);
            e.printStackTrace();
        }

    }
}```