如何将我的 JsonObject (com.google.gson.JsonObject) 转换为 GenericRecord (org.apache.avro.generic.GenericRecord) 类型

How to convert my JsonObject (com.google.gson.JsonObject) to GenericRecord (org.apache.avro.generic.GenericRecord) type

我们正在创建数据流管道,它将获取 JSON 并写入镶木地板文件。我们正在使用 org.apache.beam.sdk.io.parquet 包来编写文件。 ParquetIO.Sink 允许您将 GenericRecord 的 PCollection 写入 Parquet 文件(来自此处 https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html)。现在我们知道如何将 JsonObject(具有复杂结构)转换为 GenericRecord。

我们尝试使用 GenericRecordBuilder (org.apache.avro.generic.GenericRecordBuilder) 生成 GenericRecord。并且我们正在使用来自 com.google.gson.JsonObject 的 JsonObject 但是我们遇到了如何为 JsonArray with Ojects

转换生成 GenericRecord 的问题

我们的样本Json

{
    "event_name": "added_to_cart",
    "event_id": "AMKL9877",
    "attributes": [
        {"key": "total", "value": "8982", "type": "double"},
        {"key": "order_id", "value": "AKM1011", "type": "string"}
    ]
}

我们的架构

{  
    "type":"record",
    "name":"event",
    "fields":[  
        {  
        "name":"event_name",
        "type":"string"
        },
        {  
        "name":"event_id",
        "type":"string"
        },
        {  
        "name":"attributes",
        "type":{  
            "type":"array",
            "items":{  
            "type":"record",
            "name":"attribute_data",
            "fields":[  
                {  
                "name":"key",
                "type":"string"
                },
                {  
                "name":"value",
                "type":"string"
                },
                {  
                "name":"type",
                "type":"string"
                }
            ]
            }
        }
        }
    ]
}

我们的代码曾经使用 GenericRecordBuilder

将 JsonObject 转换为 GenericRecord
JsonObject event = element.getAsJsonObject();
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(SCHEMA);

for (Schema.Field field:SCHEMA.getFields()) {
    System.out.println(field);
    String at_header = field.getProp(FIELD_AT_HEADER_PROPERTY);
    System.out.println(at_header);
    if(at_header != null && at_header.equals(Boolean.TRUE.toString())){
        recordBuilder.set(field.name(), null);
    }else{
        JsonElement keyElement = event.get(field.name());
        recordBuilder.set(field.name(), getElementAsType(field.schema(), keyElement));
    }
}

return recordBuilder.build();


Object getElementAsType(Schema schema, JsonElement element) { 
    if(element == null || element.isJsonNull())
        return null;
    switch(schema.getType()){
    case BOOLEAN:
        return element.getAsBoolean();
    case DOUBLE:
        return element.getAsDouble();
    case FLOAT:
        return element.getAsFloat();
    case INT:
        return element.getAsInt();
    case LONG:
        return element.getAsLong();
    case NULL:
        return null;
    case ARRAY:
        ???
    case MAP:
        ???            
    default:
        return element.getAsString();
}

我们需要知道如何为对象数组等复杂类型构建 GenericRecord,从 JSON 映射。提前致谢。

我从这个页面找到了我的答案https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/generic/package-summary.html

Avro 数据的通用表示。

这种表示最适合处理动态数据的应用程序,其架构在运行时才知道。

Avro 模式映射到 Java 类型如下:

  • 模式记录实现为GenericRecord
  • 架构 enums 实现为 GenericEnumSymbol.
  • 架构数组实现为集合
  • 架构地图实现为地图
  • 架构 fixed 实现为 GenericFixed.
  • 架构 字符串 实现为 CharSequence.
  • 架构字节实现为字节缓冲区
  • 架构 ints 实现为 Integer.
  • 架构 longs 实现为 Long.
  • 架构 floats 实现为 Float.
  • 架构 double 实现为 Double.
  • 架构布尔值实现为布尔值