使用 Camel 路由反序列化 JSON
Deserialize JSON with Camel Routes
我正在尝试解组 json debezium 在 kafka 主题中生成的数据。
我的方法很简单,使用 POJO 和 Jackson 库,但是,由于此 json 有一个根对象(在“{}”内初始化),它会引发错误。
这是收到的 json,我只对有效载荷感兴趣:
{
"schema": {
"type": "struct",
"fields": [{
"type": "double",
"optional": false,
"field": "codid"
}, {
"type": "string",
"optional": true,
"field": "__op"
}, {
"type": "string",
"optional": true,
"field": "__deleted"
}],
"optional": false,
"name": "demo.RESCUE.Value"
},
"payload": {
"codid": 0.0,
"__op": "r",
"__deleted": "false"
}
}
这是我的路线:
public class Routes extends RouteBuilder{
public static class MySplitter {
public List<Payload> splitBody(Rescue data) {
return data.getPayload().stream().collect(toList());
}
}
@Override
public void configure() throws Exception {
from("kafka:{{kafka.source.topic.name}}?brokers={{kafka.bootstrap.address}}&autoOffsetReset=earliest")
.log("Received body: ${body}")
.unmarshal().json(JsonLibrary.Jackson, Rescue.class)
.split().method(MySplitter.class, "splitBody")
.marshal().json(JsonLibrary.Jackson)
.convertBodyTo(String.class)
.log("Output: ${body}");
}
}
收到错误:
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.ArrayList<org.demo.pojos.rescue.Payload>` from Object value (token `JsonToken.START_OBJECT`)
如果你只对payload
感兴趣,你必须从整个JSON中提取这个对象。例如 JSONPath.
Camel 支持 JSONPath as expression language。因此你可以尝试像
.log("Received body: ${body}") // logs the full JSON
.setBody().jsonpathWriteAsString("$.payload")
.log("Reduced body: ${body}") // should log the new body (only the payload)
...
请注意,您需要添加 camel-jsonpath 依赖项
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jsonpath</artifactId>
</dependency>
或者如果你使用 SpringBoot
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-jsonpath-starter</artifactId>
</dependency>
我正在尝试解组 json debezium 在 kafka 主题中生成的数据。
我的方法很简单,使用 POJO 和 Jackson 库,但是,由于此 json 有一个根对象(在“{}”内初始化),它会引发错误。
这是收到的 json,我只对有效载荷感兴趣:
{
"schema": {
"type": "struct",
"fields": [{
"type": "double",
"optional": false,
"field": "codid"
}, {
"type": "string",
"optional": true,
"field": "__op"
}, {
"type": "string",
"optional": true,
"field": "__deleted"
}],
"optional": false,
"name": "demo.RESCUE.Value"
},
"payload": {
"codid": 0.0,
"__op": "r",
"__deleted": "false"
}
}
这是我的路线:
public class Routes extends RouteBuilder{
public static class MySplitter {
public List<Payload> splitBody(Rescue data) {
return data.getPayload().stream().collect(toList());
}
}
@Override
public void configure() throws Exception {
from("kafka:{{kafka.source.topic.name}}?brokers={{kafka.bootstrap.address}}&autoOffsetReset=earliest")
.log("Received body: ${body}")
.unmarshal().json(JsonLibrary.Jackson, Rescue.class)
.split().method(MySplitter.class, "splitBody")
.marshal().json(JsonLibrary.Jackson)
.convertBodyTo(String.class)
.log("Output: ${body}");
}
}
收到错误:
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.ArrayList<org.demo.pojos.rescue.Payload>` from Object value (token `JsonToken.START_OBJECT`)
如果你只对payload
感兴趣,你必须从整个JSON中提取这个对象。例如 JSONPath.
Camel 支持 JSONPath as expression language。因此你可以尝试像
.log("Received body: ${body}") // logs the full JSON
.setBody().jsonpathWriteAsString("$.payload")
.log("Reduced body: ${body}") // should log the new body (only the payload)
...
请注意,您需要添加 camel-jsonpath 依赖项
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jsonpath</artifactId>
</dependency>
或者如果你使用 SpringBoot
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-jsonpath-starter</artifactId>
</dependency>