使用 Apache Camel 进行多态 Json 编组

Polymorphic Json Marshalling with Apache Camel

我正在重构骆驼路线,希望它更通用一些。 (我也在使用 spring 启动,如果这对任何可能的 bean 注入解决方案有帮助的话)

from(fromKafka)
                .routeId("Rest Models")
                .removeHeaders("*")
                .aggregate(new GroupedBodyAggregationStrategy())
                .constant(true)
                .completionTimeout(batchingInterval)
                .process(new ListOfJsonToJsonArray())
                .unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
                .enrich("seda:rest", mergeRestResult)

处理器 ListOfJsonToJsonArray() 获取 kafka 消息的 json 字符串表示,并加入所有内容,以逗号分隔,在外面有一个 {[ ]}

因此,InputArrayPojo.class 是来自 kafka 的对象数组的包装器。我需要捆绑对象,以便在充实中对 REST 接口进行小批量处理。包含的对象格式为 InputPojo.class(实际上只是一个模式,但也执行一些基本的数据质量检查)

我需要一种方法来生成 InputPojo.class,这样对于我们的新工作,我们可以 运行 相同的路线,但提供不同的 InputPojo.class。

我尝试应用多态性并为 InputPojo 创建一个接口,但是 运行 在尝试构建接口时出现错误。

@JsonSubTypes({
        @JsonSubTypes.Type(value=InputPojo.class, name = "online")
})
public interface InputPojoInterface {
}

我也尝试了一些参数化,但我也没有运气,因为它不会应用 bean 的构造函数,然后 none 方法存在。

我也加入了

com.fasterxml.jackson.databind.exc.InvalidDefinitionException - Cannot construct instance of `InputPojoInterface` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (ByteArrayInputStream); line: 1, column: 10] (through reference chain: InputArrayPojo["data"]->java.util.ArrayList[0])]
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
        "data"
})
public class InputArrayPojo{

    @JsonProperty("data")
    private List<InputPojo> data = null;
    @JsonIgnore
    private Map<String, Object> additionalProperties = new HashMap<String, Object>();

    @JsonProperty("data")
    public List<InputPojo> getData() {
        return data;
    }

    @JsonProperty("data")
    public void setData(List<InputPojo> data) {
        this.data = data;
    }

    @JsonAnyGetter
    public Map<String, Object> getAdditionalProperties() {
        return this.additionalProperties;
    }

    @JsonAnySetter
    public void setAdditionalProperty(String name, Object value) {
        this.additionalProperties.put(name, value);
    }

}

丰富还需要实现某种类型的生成逻辑

@Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
        List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
        List<ModelResultPojo> outputList = new ArrayList<>();

        for (int i = 0; i < originalMessages.size(); ++i) {
            ModelResultPojo output = new ModelResultPojo();
            IngestionOutPojo raw = originalMessages.get(i);
            PredictionPojo enrich = enrichmentMessages.get(i);
            /*
            enrichment logic to create modelResult
            */
            outputList.add(modelResult)
    }
    newExchange.getIn().setBody(outputList);
    return newExchange
}

我最终通过执行以下操作找到了解决方案:

解组为默认类型:Map(未指定 class,它驼峰解组为 Map

之后我写了一个实现处理器的摘要class。在这个处理器中,我使用地图,并将抽象的 editFields() 函数应用于地图。

因此我现在可以通过 Map 而不是通过 POJO 对业务逻辑进行多态处理。