将 Parquet/Avro GenericRecord 写入 JSON,同时维护 LogicalTypes

Writing Parquet/Avro GenericRecord to JSON while maintaining LogicalTypes

我正在尝试将一些包含 LogicalTypes 的 Parquet 记录写入 JSON。我通过 AvroParquetReader 执行此操作,这给了我一个 Avro GenericRecord:

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());

try (ParquetReader<GenericRecord> parquetReader =
    AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    record.toString();
}

record.toString() 产生:

{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}

请注意这是无效的 JSON - 日期已根据其 LogicalType 正确转换,但未被引号包围。

所以我尝试了 JsonEncoder:

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //etc
OutputStream stringOutputStream = new StringOutputStream();

try (ParquetReader<GenericRecord> parquetReader =
    AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
    writer.write(record, encoder);
    encoder.flush();
}

但这根本不会转换日期字段并将数据类型烘焙到每条记录中:

{"universe_member_id":{"long":94639},"member_from_dt":{"long":999216000000000},"member_to_dt":{"long":7258118400000000}}

我正在寻找的输出是:

{"universe_member_id": 94639, "member_from_dt": "2001-08-31T00:00:00Z", "member_to_dt": "2200-01-01T00:00:00Z"}

如何正确地将 GenericRecord 写入 JSON?

正如您所指出的,class GenericRecord 中的方法 toString() 将为您提供几乎有效的 JSON 表示。

正如您在 GenericData class 的 source code 中所见,GenericData.Record toString 方法仅调用 GenericData toString(Object) 方法在其实现中。

如果您想要记录的有效 JSON 表示,您可以采用该代码,只需进行最少的修改,即可获得所需的信息。

例如,我们可以定义一个实用程序 class,如下所示:

package Whosebug.parquetavro;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Function;

import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

public class GenericRecordJsonEncoder {

  Map<LogicalType, Function<Object, Object>> logicalTypesConverters = new HashMap<>();

  public void registerLogicalTypeConverter(LogicalType logicalType, Function<Object, Object> converter) {
    this.logicalTypesConverters.put(logicalType, converter);
  }

  public Function<Object, Object> getLogicalTypeConverter(Schema.Field field) {
    Schema fieldSchema = field.schema();
    LogicalType logicalType = fieldSchema.getLogicalType();
    return getLogicalTypeConverter(logicalType);
  }

  public Function<Object, Object> getLogicalTypeConverter(LogicalType logicalType) {
    if (logicalType == null) {
      return Function.identity();
    }

    return logicalTypesConverters.getOrDefault(logicalType, Function.identity());
  }

  public String serialize(GenericRecord value) {
    StringBuilder buffer = new StringBuilder();
    serialize(value, buffer, new IdentityHashMap<>(128) );
    String result = buffer.toString();
    return result;
  }

  private static final String TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT =
      " \">>> CIRCULAR REFERENCE CANNOT BE PUT IN JSON STRING, ABORTING RECURSION <<<\" ";

  /** Renders a Java datum as <a href="http://www.json.org/">JSON</a>. */
  private void serialize(final Object datum, final StringBuilder buffer, final IdentityHashMap<Object, Object> seenObjects) {
    if (isRecord(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      buffer.append("{");
      int count = 0;
      Schema schema = getRecordSchema(datum);
      for (Schema.Field f : schema.getFields()) {
        serialize(f.name(), buffer, seenObjects);
        buffer.append(": ");
        Function<Object, Object> logicalTypeConverter = getLogicalTypeConverter(f);
        serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects);
        if (++count < schema.getFields().size())
          buffer.append(", ");
      }
      buffer.append("}");
      seenObjects.remove(datum);
    } else if (isArray(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      Collection<?> array = getArrayAsCollection(datum);
      buffer.append("[");
      long last = array.size()-1;
      int i = 0;
      for (Object element : array) {
        serialize(element, buffer, seenObjects);
        if (i++ < last)
          buffer.append(", ");
      }
      buffer.append("]");
      seenObjects.remove(datum);
    } else if (isMap(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      buffer.append("{");
      int count = 0;
      @SuppressWarnings(value="unchecked")
      Map<Object,Object> map = (Map<Object,Object>)datum;
      for (Map.Entry<Object,Object> entry : map.entrySet()) {
        serialize(entry.getKey(), buffer, seenObjects);
        buffer.append(": ");
        serialize(entry.getValue(), buffer, seenObjects);
        if (++count < map.size())
          buffer.append(", ");
      }
      buffer.append("}");
      seenObjects.remove(datum);
    } else if (isString(datum)|| isEnum(datum)) {
      buffer.append("\"");
      writeEscapedString(datum.toString(), buffer);
      buffer.append("\"");
    } else if (isBytes(datum)) {
      buffer.append("{\"bytes\": \"");
      ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
      writeEscapedString(StandardCharsets.ISO_8859_1.decode(bytes), buffer);
      buffer.append("\"}");
    } else if (((datum instanceof Float) &&       // quote Nan & Infinity
        (((Float)datum).isInfinite() || ((Float)datum).isNaN()))
        || ((datum instanceof Double) &&
        (((Double)datum).isInfinite() || ((Double)datum).isNaN()))) {
      buffer.append("\"");
      buffer.append(datum);
      buffer.append("\"");
    } else if (datum instanceof GenericData) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      serialize(datum, buffer, seenObjects);
      seenObjects.remove(datum);
    } else {
      // This fallback is the reason why GenericRecord toString does not
      // generate a valid JSON representation
      buffer.append(datum);
    }
  }

  // All these methods are also copied from the GenericData class source

  private boolean isRecord(Object datum) {
    return datum instanceof IndexedRecord;
  }

  private Schema getRecordSchema(Object record) {
    return ((GenericContainer)record).getSchema();
  }

  private Object getField(Object record, String name, int position) {
    return ((IndexedRecord)record).get(position);
  }

  private boolean isArray(Object datum) {
    return datum instanceof Collection;
  }

  private Collection getArrayAsCollection(Object datum) {
    return (Collection)datum;
  }

  private boolean isEnum(Object datum) {
    return datum instanceof GenericEnumSymbol;
  }

  private boolean isMap(Object datum) {
    return datum instanceof Map;
  }

  private boolean isString(Object datum) {
    return datum instanceof CharSequence;
  }

  private boolean isBytes(Object datum) {
    return datum instanceof ByteBuffer;
  }

  private void writeEscapedString(CharSequence string, StringBuilder builder) {
    for(int i = 0; i < string.length(); i++){
      char ch = string.charAt(i);
      switch(ch){
        case '"':
          builder.append("\\"");
          break;
        case '\':
          builder.append("\\");
          break;
        case '\b':
          builder.append("\b");
          break;
        case '\f':
          builder.append("\f");
          break;
        case '\n':
          builder.append("\n");
          break;
        case '\r':
          builder.append("\r");
          break;
        case '\t':
          builder.append("\t");
          break;
        default:
          // Reference: http://www.unicode.org/versions/Unicode5.1.0/
          if((ch>='\u0000' && ch<='\u001F') || (ch>='\u007F' && ch<='\u009F') || (ch>='\u2000' && ch<='\u20FF')){
            String hex = Integer.toHexString(ch);
            builder.append("\u");
            for(int j = 0; j < 4 - hex.length(); j++)
              builder.append('0');
            builder.append(hex.toUpperCase());
          } else {
            builder.append(ch);
          }
      }
    }
  }
}

在此 class 中,您可以为所需的逻辑类型注册转换器。考虑以下示例:

GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
// Register as many logical types converters as you need
encoder.registerLogicalTypeConverter(LogicalTypes.timestampMillis(), o -> {
  final Instant instant = (Instant)o;
  final String result = DateTimeFormatter.ISO_INSTANT.format(instant);
  return result;
});

String json = encoder.serialize(genericRecord);
System.out.println(json);

这将为您提供所需的结果。