使用 Apache Camel 进行多态 Json 编组
Polymorphic Json Marshalling with Apache Camel
我正在重构骆驼路线,希望它更通用一些。 (我也在使用 spring 启动,如果这对任何可能的 bean 注入解决方案有帮助的话)
from(fromKafka)
.routeId("Rest Models")
.removeHeaders("*")
.aggregate(new GroupedBodyAggregationStrategy())
.constant(true)
.completionTimeout(batchingInterval)
.process(new ListOfJsonToJsonArray())
.unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
.enrich("seda:rest", mergeRestResult)
处理器 ListOfJsonToJsonArray()
获取 kafka 消息的 json 字符串表示,并加入所有内容,以逗号分隔,在外面有一个 {[ ]}
。
因此,InputArrayPojo.class 是来自 kafka 的对象数组的包装器。我需要捆绑对象,以便在充实中对 REST 接口进行小批量处理。包含的对象格式为 InputPojo.class(实际上只是一个模式,但也执行一些基本的数据质量检查)
我需要一种方法来生成 InputPojo.class,这样对于我们的新工作,我们可以 运行 相同的路线,但提供不同的 InputPojo.class。
我尝试应用多态性并为 InputPojo 创建一个接口,但是 运行 在尝试构建接口时出现错误。
@JsonSubTypes({
@JsonSubTypes.Type(value=InputPojo.class, name = "online")
})
public interface InputPojoInterface {
}
我也尝试了一些参数化,但我也没有运气,因为它不会应用 bean 的构造函数,然后 none 方法存在。
我也加入了
com.fasterxml.jackson.databind.exc.InvalidDefinitionException - Cannot construct instance of `InputPojoInterface` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (ByteArrayInputStream); line: 1, column: 10] (through reference chain: InputArrayPojo["data"]->java.util.ArrayList[0])]
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"data"
})
public class InputArrayPojo{
@JsonProperty("data")
private List<InputPojo> data = null;
@JsonIgnore
private Map<String, Object> additionalProperties = new HashMap<String, Object>();
@JsonProperty("data")
public List<InputPojo> getData() {
return data;
}
@JsonProperty("data")
public void setData(List<InputPojo> data) {
this.data = data;
}
@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperty(String name, Object value) {
this.additionalProperties.put(name, value);
}
}
丰富还需要实现某种类型的生成逻辑
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
List<ModelResultPojo> outputList = new ArrayList<>();
for (int i = 0; i < originalMessages.size(); ++i) {
ModelResultPojo output = new ModelResultPojo();
IngestionOutPojo raw = originalMessages.get(i);
PredictionPojo enrich = enrichmentMessages.get(i);
/*
enrichment logic to create modelResult
*/
outputList.add(modelResult)
}
newExchange.getIn().setBody(outputList);
return newExchange
}
我最终通过执行以下操作找到了解决方案:
解组为默认类型:Map(未指定 class,它驼峰解组为 Map)
之后我写了一个实现处理器的摘要class。在这个处理器中,我使用地图,并将抽象的 editFields() 函数应用于地图。
因此我现在可以通过 Map 而不是通过 POJO 对业务逻辑进行多态处理。
我正在重构骆驼路线,希望它更通用一些。 (我也在使用 spring 启动,如果这对任何可能的 bean 注入解决方案有帮助的话)
from(fromKafka)
.routeId("Rest Models")
.removeHeaders("*")
.aggregate(new GroupedBodyAggregationStrategy())
.constant(true)
.completionTimeout(batchingInterval)
.process(new ListOfJsonToJsonArray())
.unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
.enrich("seda:rest", mergeRestResult)
处理器 ListOfJsonToJsonArray()
获取 kafka 消息的 json 字符串表示,并加入所有内容,以逗号分隔,在外面有一个 {[ ]}
。
因此,InputArrayPojo.class 是来自 kafka 的对象数组的包装器。我需要捆绑对象,以便在充实中对 REST 接口进行小批量处理。包含的对象格式为 InputPojo.class(实际上只是一个模式,但也执行一些基本的数据质量检查)
我需要一种方法来生成 InputPojo.class,这样对于我们的新工作,我们可以 运行 相同的路线,但提供不同的 InputPojo.class。
我尝试应用多态性并为 InputPojo 创建一个接口,但是 运行 在尝试构建接口时出现错误。
@JsonSubTypes({
@JsonSubTypes.Type(value=InputPojo.class, name = "online")
})
public interface InputPojoInterface {
}
我也尝试了一些参数化,但我也没有运气,因为它不会应用 bean 的构造函数,然后 none 方法存在。
我也加入了
com.fasterxml.jackson.databind.exc.InvalidDefinitionException - Cannot construct instance of `InputPojoInterface` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (ByteArrayInputStream); line: 1, column: 10] (through reference chain: InputArrayPojo["data"]->java.util.ArrayList[0])]
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"data"
})
public class InputArrayPojo{
@JsonProperty("data")
private List<InputPojo> data = null;
@JsonIgnore
private Map<String, Object> additionalProperties = new HashMap<String, Object>();
@JsonProperty("data")
public List<InputPojo> getData() {
return data;
}
@JsonProperty("data")
public void setData(List<InputPojo> data) {
this.data = data;
}
@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperty(String name, Object value) {
this.additionalProperties.put(name, value);
}
}
丰富还需要实现某种类型的生成逻辑
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
List<ModelResultPojo> outputList = new ArrayList<>();
for (int i = 0; i < originalMessages.size(); ++i) {
ModelResultPojo output = new ModelResultPojo();
IngestionOutPojo raw = originalMessages.get(i);
PredictionPojo enrich = enrichmentMessages.get(i);
/*
enrichment logic to create modelResult
*/
outputList.add(modelResult)
}
newExchange.getIn().setBody(outputList);
return newExchange
}
我最终通过执行以下操作找到了解决方案:
解组为默认类型:Map
之后我写了一个实现处理器的摘要class。在这个处理器中,我使用地图,并将抽象的 editFields() 函数应用于地图。
因此我现在可以通过 Map 而不是通过 POJO 对业务逻辑进行多态处理。