如何使用 Java 在 Apache Beam 中将 JSON 转换为 Parquet
How to convert JSON to Parquet in Apache Beam using Java
我正在尝试转换 Json 数据
{"col1":"sample-val-1", "col2":1.0}
{"col1":"sample-val-2", "col2":2.0}
{"col1":"sample-val-3", "col2":3.0}
{"col1":"sample-val-4", "col2":4.0}
{"col1":"sample-val-5", "col2":5.0}
我需要将其转换为 Parquet
然后我在 Apache Beam 中写了一些代码
package org.apache.beam.examples;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.kitesdk.data.spi.JsonUtil;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
public class Main {
public static void main(String[] args) throws IOException {
Pipeline pipeLine = Pipeline.create();
PCollection<String> lines = pipeLine.apply("ReadMyFile", TextIO.read().from("path-to-file"));
File initialFile = new File("path-to-file");
InputStream targetStream = Files.newInputStream(initialFile.toPath());
Schema jsonSchema = JsonUtil.inferSchema(targetStream, "RecordName", 20);
System.out.println(jsonSchema.getDoc());
PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new GsonBuilder().create();
JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
// out.output(parsedMap);
// System.out.println(Arrays.toString(parsedMap.toString().getBytes(StandardCharsets.UTF_8)));
JsonAvroConverter avroConverter = new JsonAvroConverter();
// GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
// context.output(record);
}
}));
pipeLine.run();
//
// pgr.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to("path/to/save"));
}
}
我能够逐行获取 json 但无法将其转换为 Parquet 上面的代码抛出错误
如果您尝试使用
将 Json 转换为 Parquet
GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
这条线导致的错误
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
... 25 more
我创建了一个新的 class 并添加为参数构造并在 @Setup 中创建它。
Schema jsonSchema = new Schema.Parser().parse(schemaString);
pipeLine.apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("Convert Json To General Record", ParDo.of(new JsonToGeneralRecord(jsonSchema)))
.setCoder(AvroCoder.of(GenericRecord.class, jsonSchema))
private static final Logger logger = LogManager.getLogger(JsonToGeneralRecord.class);
private final String schemaString;
private Schema jsonSchema;
// constructor
JsonToGeneralRecord(Schema schema) {
schemaString = schema.toString();
}
@Setup
public void setup() {
jsonSchema = new Schema.Parser().parse(schemaString);
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Gson gson = new GsonBuilder().create();
JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
logger.info("successful: " + parsedMap.toString());
JsonAvroConverter avroConverter = new JsonAvroConverter();
try {
GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
c.output(record);
} catch (Exception e) {
logger.error("error: " + e.getMessage() + parsedMap);
e.printStackTrace();
}
}
}```
我正在尝试转换 Json 数据
{"col1":"sample-val-1", "col2":1.0}
{"col1":"sample-val-2", "col2":2.0}
{"col1":"sample-val-3", "col2":3.0}
{"col1":"sample-val-4", "col2":4.0}
{"col1":"sample-val-5", "col2":5.0}
我需要将其转换为 Parquet
然后我在 Apache Beam 中写了一些代码
package org.apache.beam.examples;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.kitesdk.data.spi.JsonUtil;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
public class Main {
public static void main(String[] args) throws IOException {
Pipeline pipeLine = Pipeline.create();
PCollection<String> lines = pipeLine.apply("ReadMyFile", TextIO.read().from("path-to-file"));
File initialFile = new File("path-to-file");
InputStream targetStream = Files.newInputStream(initialFile.toPath());
Schema jsonSchema = JsonUtil.inferSchema(targetStream, "RecordName", 20);
System.out.println(jsonSchema.getDoc());
PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new GsonBuilder().create();
JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
// out.output(parsedMap);
// System.out.println(Arrays.toString(parsedMap.toString().getBytes(StandardCharsets.UTF_8)));
JsonAvroConverter avroConverter = new JsonAvroConverter();
// GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
// context.output(record);
}
}));
pipeLine.run();
//
// pgr.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to("path/to/save"));
}
}
我能够逐行获取 json 但无法将其转换为 Parquet 上面的代码抛出错误 如果您尝试使用
将 Json 转换为 ParquetGenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
这条线导致的错误
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
... 25 more
我创建了一个新的 class 并添加为参数构造并在 @Setup 中创建它。
Schema jsonSchema = new Schema.Parser().parse(schemaString);
pipeLine.apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("Convert Json To General Record", ParDo.of(new JsonToGeneralRecord(jsonSchema)))
.setCoder(AvroCoder.of(GenericRecord.class, jsonSchema))
private static final Logger logger = LogManager.getLogger(JsonToGeneralRecord.class);
private final String schemaString;
private Schema jsonSchema;
// constructor
JsonToGeneralRecord(Schema schema) {
schemaString = schema.toString();
}
@Setup
public void setup() {
jsonSchema = new Schema.Parser().parse(schemaString);
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Gson gson = new GsonBuilder().create();
JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
logger.info("successful: " + parsedMap.toString());
JsonAvroConverter avroConverter = new JsonAvroConverter();
try {
GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
c.output(record);
} catch (Exception e) {
logger.error("error: " + e.getMessage() + parsedMap);
e.printStackTrace();
}
}
}```