将 Parquet/Avro GenericRecord 写入 JSON,同时维护 LogicalTypes
Writing Parquet/Avro GenericRecord to JSON while maintaining LogicalTypes
我正在尝试将一些包含 LogicalTypes 的 Parquet 记录写入 JSON。我通过 AvroParquetReader
执行此操作,这给了我一个 Avro GenericRecord
:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
record.toString();
}
record.toString()
产生:
{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}
请注意这是无效的 JSON - 日期已根据其 LogicalType
正确转换,但未被引号包围。
所以我尝试了 JsonEncoder
:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //etc
OutputStream stringOutputStream = new StringOutputStream();
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
writer.write(record, encoder);
encoder.flush();
}
但这根本不会转换日期字段并将数据类型烘焙到每条记录中:
{"universe_member_id":{"long":94639},"member_from_dt":{"long":999216000000000},"member_to_dt":{"long":7258118400000000}}
我正在寻找的输出是:
{"universe_member_id": 94639, "member_from_dt": "2001-08-31T00:00:00Z", "member_to_dt": "2200-01-01T00:00:00Z"}
如何正确地将 GenericRecord
写入 JSON?
正如您所指出的,class GenericRecord
中的方法 toString()
将为您提供几乎有效的 JSON 表示。
正如您在 GenericData
class 的 source code 中所见,GenericData.Record
toString
方法仅调用 GenericData
toString(Object)
方法在其实现中。
如果您想要记录的有效 JSON 表示,您可以采用该代码,只需进行最少的修改,即可获得所需的信息。
例如,我们可以定义一个实用程序 class,如下所示:
package Whosebug.parquetavro;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
public class GenericRecordJsonEncoder {
Map<LogicalType, Function<Object, Object>> logicalTypesConverters = new HashMap<>();
public void registerLogicalTypeConverter(LogicalType logicalType, Function<Object, Object> converter) {
this.logicalTypesConverters.put(logicalType, converter);
}
public Function<Object, Object> getLogicalTypeConverter(Schema.Field field) {
Schema fieldSchema = field.schema();
LogicalType logicalType = fieldSchema.getLogicalType();
return getLogicalTypeConverter(logicalType);
}
public Function<Object, Object> getLogicalTypeConverter(LogicalType logicalType) {
if (logicalType == null) {
return Function.identity();
}
return logicalTypesConverters.getOrDefault(logicalType, Function.identity());
}
public String serialize(GenericRecord value) {
StringBuilder buffer = new StringBuilder();
serialize(value, buffer, new IdentityHashMap<>(128) );
String result = buffer.toString();
return result;
}
private static final String TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT =
" \">>> CIRCULAR REFERENCE CANNOT BE PUT IN JSON STRING, ABORTING RECURSION <<<\" ";
/** Renders a Java datum as <a href="http://www.json.org/">JSON</a>. */
private void serialize(final Object datum, final StringBuilder buffer, final IdentityHashMap<Object, Object> seenObjects) {
if (isRecord(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
buffer.append("{");
int count = 0;
Schema schema = getRecordSchema(datum);
for (Schema.Field f : schema.getFields()) {
serialize(f.name(), buffer, seenObjects);
buffer.append(": ");
Function<Object, Object> logicalTypeConverter = getLogicalTypeConverter(f);
serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects);
if (++count < schema.getFields().size())
buffer.append(", ");
}
buffer.append("}");
seenObjects.remove(datum);
} else if (isArray(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
Collection<?> array = getArrayAsCollection(datum);
buffer.append("[");
long last = array.size()-1;
int i = 0;
for (Object element : array) {
serialize(element, buffer, seenObjects);
if (i++ < last)
buffer.append(", ");
}
buffer.append("]");
seenObjects.remove(datum);
} else if (isMap(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
buffer.append("{");
int count = 0;
@SuppressWarnings(value="unchecked")
Map<Object,Object> map = (Map<Object,Object>)datum;
for (Map.Entry<Object,Object> entry : map.entrySet()) {
serialize(entry.getKey(), buffer, seenObjects);
buffer.append(": ");
serialize(entry.getValue(), buffer, seenObjects);
if (++count < map.size())
buffer.append(", ");
}
buffer.append("}");
seenObjects.remove(datum);
} else if (isString(datum)|| isEnum(datum)) {
buffer.append("\"");
writeEscapedString(datum.toString(), buffer);
buffer.append("\"");
} else if (isBytes(datum)) {
buffer.append("{\"bytes\": \"");
ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
writeEscapedString(StandardCharsets.ISO_8859_1.decode(bytes), buffer);
buffer.append("\"}");
} else if (((datum instanceof Float) && // quote Nan & Infinity
(((Float)datum).isInfinite() || ((Float)datum).isNaN()))
|| ((datum instanceof Double) &&
(((Double)datum).isInfinite() || ((Double)datum).isNaN()))) {
buffer.append("\"");
buffer.append(datum);
buffer.append("\"");
} else if (datum instanceof GenericData) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
serialize(datum, buffer, seenObjects);
seenObjects.remove(datum);
} else {
// This fallback is the reason why GenericRecord toString does not
// generate a valid JSON representation
buffer.append(datum);
}
}
// All these methods are also copied from the GenericData class source
private boolean isRecord(Object datum) {
return datum instanceof IndexedRecord;
}
private Schema getRecordSchema(Object record) {
return ((GenericContainer)record).getSchema();
}
private Object getField(Object record, String name, int position) {
return ((IndexedRecord)record).get(position);
}
private boolean isArray(Object datum) {
return datum instanceof Collection;
}
private Collection getArrayAsCollection(Object datum) {
return (Collection)datum;
}
private boolean isEnum(Object datum) {
return datum instanceof GenericEnumSymbol;
}
private boolean isMap(Object datum) {
return datum instanceof Map;
}
private boolean isString(Object datum) {
return datum instanceof CharSequence;
}
private boolean isBytes(Object datum) {
return datum instanceof ByteBuffer;
}
private void writeEscapedString(CharSequence string, StringBuilder builder) {
for(int i = 0; i < string.length(); i++){
char ch = string.charAt(i);
switch(ch){
case '"':
builder.append("\\"");
break;
case '\':
builder.append("\\");
break;
case '\b':
builder.append("\b");
break;
case '\f':
builder.append("\f");
break;
case '\n':
builder.append("\n");
break;
case '\r':
builder.append("\r");
break;
case '\t':
builder.append("\t");
break;
default:
// Reference: http://www.unicode.org/versions/Unicode5.1.0/
if((ch>='\u0000' && ch<='\u001F') || (ch>='\u007F' && ch<='\u009F') || (ch>='\u2000' && ch<='\u20FF')){
String hex = Integer.toHexString(ch);
builder.append("\u");
for(int j = 0; j < 4 - hex.length(); j++)
builder.append('0');
builder.append(hex.toUpperCase());
} else {
builder.append(ch);
}
}
}
}
}
在此 class 中,您可以为所需的逻辑类型注册转换器。考虑以下示例:
GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
// Register as many logical types converters as you need
encoder.registerLogicalTypeConverter(LogicalTypes.timestampMillis(), o -> {
final Instant instant = (Instant)o;
final String result = DateTimeFormatter.ISO_INSTANT.format(instant);
return result;
});
String json = encoder.serialize(genericRecord);
System.out.println(json);
这将为您提供所需的结果。
我正在尝试将一些包含 LogicalTypes 的 Parquet 记录写入 JSON。我通过 AvroParquetReader
执行此操作,这给了我一个 Avro GenericRecord
:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
record.toString();
}
record.toString()
产生:
{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}
请注意这是无效的 JSON - 日期已根据其 LogicalType
正确转换,但未被引号包围。
所以我尝试了 JsonEncoder
:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //etc
OutputStream stringOutputStream = new StringOutputStream();
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
writer.write(record, encoder);
encoder.flush();
}
但这根本不会转换日期字段并将数据类型烘焙到每条记录中:
{"universe_member_id":{"long":94639},"member_from_dt":{"long":999216000000000},"member_to_dt":{"long":7258118400000000}}
我正在寻找的输出是:
{"universe_member_id": 94639, "member_from_dt": "2001-08-31T00:00:00Z", "member_to_dt": "2200-01-01T00:00:00Z"}
如何正确地将 GenericRecord
写入 JSON?
正如您所指出的,class GenericRecord
中的方法 toString()
将为您提供几乎有效的 JSON 表示。
正如您在 GenericData
class 的 source code 中所见,GenericData.Record
toString
方法仅调用 GenericData
toString(Object)
方法在其实现中。
如果您想要记录的有效 JSON 表示,您可以采用该代码,只需进行最少的修改,即可获得所需的信息。
例如,我们可以定义一个实用程序 class,如下所示:
package Whosebug.parquetavro;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
public class GenericRecordJsonEncoder {
Map<LogicalType, Function<Object, Object>> logicalTypesConverters = new HashMap<>();
public void registerLogicalTypeConverter(LogicalType logicalType, Function<Object, Object> converter) {
this.logicalTypesConverters.put(logicalType, converter);
}
public Function<Object, Object> getLogicalTypeConverter(Schema.Field field) {
Schema fieldSchema = field.schema();
LogicalType logicalType = fieldSchema.getLogicalType();
return getLogicalTypeConverter(logicalType);
}
public Function<Object, Object> getLogicalTypeConverter(LogicalType logicalType) {
if (logicalType == null) {
return Function.identity();
}
return logicalTypesConverters.getOrDefault(logicalType, Function.identity());
}
public String serialize(GenericRecord value) {
StringBuilder buffer = new StringBuilder();
serialize(value, buffer, new IdentityHashMap<>(128) );
String result = buffer.toString();
return result;
}
private static final String TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT =
" \">>> CIRCULAR REFERENCE CANNOT BE PUT IN JSON STRING, ABORTING RECURSION <<<\" ";
/** Renders a Java datum as <a href="http://www.json.org/">JSON</a>. */
private void serialize(final Object datum, final StringBuilder buffer, final IdentityHashMap<Object, Object> seenObjects) {
if (isRecord(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
buffer.append("{");
int count = 0;
Schema schema = getRecordSchema(datum);
for (Schema.Field f : schema.getFields()) {
serialize(f.name(), buffer, seenObjects);
buffer.append(": ");
Function<Object, Object> logicalTypeConverter = getLogicalTypeConverter(f);
serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects);
if (++count < schema.getFields().size())
buffer.append(", ");
}
buffer.append("}");
seenObjects.remove(datum);
} else if (isArray(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
Collection<?> array = getArrayAsCollection(datum);
buffer.append("[");
long last = array.size()-1;
int i = 0;
for (Object element : array) {
serialize(element, buffer, seenObjects);
if (i++ < last)
buffer.append(", ");
}
buffer.append("]");
seenObjects.remove(datum);
} else if (isMap(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
buffer.append("{");
int count = 0;
@SuppressWarnings(value="unchecked")
Map<Object,Object> map = (Map<Object,Object>)datum;
for (Map.Entry<Object,Object> entry : map.entrySet()) {
serialize(entry.getKey(), buffer, seenObjects);
buffer.append(": ");
serialize(entry.getValue(), buffer, seenObjects);
if (++count < map.size())
buffer.append(", ");
}
buffer.append("}");
seenObjects.remove(datum);
} else if (isString(datum)|| isEnum(datum)) {
buffer.append("\"");
writeEscapedString(datum.toString(), buffer);
buffer.append("\"");
} else if (isBytes(datum)) {
buffer.append("{\"bytes\": \"");
ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
writeEscapedString(StandardCharsets.ISO_8859_1.decode(bytes), buffer);
buffer.append("\"}");
} else if (((datum instanceof Float) && // quote Nan & Infinity
(((Float)datum).isInfinite() || ((Float)datum).isNaN()))
|| ((datum instanceof Double) &&
(((Double)datum).isInfinite() || ((Double)datum).isNaN()))) {
buffer.append("\"");
buffer.append(datum);
buffer.append("\"");
} else if (datum instanceof GenericData) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
serialize(datum, buffer, seenObjects);
seenObjects.remove(datum);
} else {
// This fallback is the reason why GenericRecord toString does not
// generate a valid JSON representation
buffer.append(datum);
}
}
// All these methods are also copied from the GenericData class source
private boolean isRecord(Object datum) {
return datum instanceof IndexedRecord;
}
private Schema getRecordSchema(Object record) {
return ((GenericContainer)record).getSchema();
}
private Object getField(Object record, String name, int position) {
return ((IndexedRecord)record).get(position);
}
private boolean isArray(Object datum) {
return datum instanceof Collection;
}
private Collection getArrayAsCollection(Object datum) {
return (Collection)datum;
}
private boolean isEnum(Object datum) {
return datum instanceof GenericEnumSymbol;
}
private boolean isMap(Object datum) {
return datum instanceof Map;
}
private boolean isString(Object datum) {
return datum instanceof CharSequence;
}
private boolean isBytes(Object datum) {
return datum instanceof ByteBuffer;
}
private void writeEscapedString(CharSequence string, StringBuilder builder) {
for(int i = 0; i < string.length(); i++){
char ch = string.charAt(i);
switch(ch){
case '"':
builder.append("\\"");
break;
case '\':
builder.append("\\");
break;
case '\b':
builder.append("\b");
break;
case '\f':
builder.append("\f");
break;
case '\n':
builder.append("\n");
break;
case '\r':
builder.append("\r");
break;
case '\t':
builder.append("\t");
break;
default:
// Reference: http://www.unicode.org/versions/Unicode5.1.0/
if((ch>='\u0000' && ch<='\u001F') || (ch>='\u007F' && ch<='\u009F') || (ch>='\u2000' && ch<='\u20FF')){
String hex = Integer.toHexString(ch);
builder.append("\u");
for(int j = 0; j < 4 - hex.length(); j++)
builder.append('0');
builder.append(hex.toUpperCase());
} else {
builder.append(ch);
}
}
}
}
}
在此 class 中,您可以为所需的逻辑类型注册转换器。考虑以下示例:
GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
// Register as many logical types converters as you need
encoder.registerLogicalTypeConverter(LogicalTypes.timestampMillis(), o -> {
final Instant instant = (Instant)o;
final String result = DateTimeFormatter.ISO_INSTANT.format(instant);
return result;
});
String json = encoder.serialize(genericRecord);
System.out.println(json);
这将为您提供所需的结果。