使用 Camel 路由反序列化 JSON

Deserialize JSON with Camel Routes

我正在尝试解组 json debezium 在 kafka 主题中生成的数据。

我的方法很简单,使用 POJO 和 Jackson 库,但是,由于此 json 有一个根对象(在“{}”内初始化),它会引发错误。

这是收到的 json,我只对有效载荷感兴趣:

{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "double",
            "optional": false,
            "field": "codid"
        }, {
            "type": "string",
            "optional": true,
            "field": "__op"
        }, {
            "type": "string",
            "optional": true,
            "field": "__deleted"
        }],
        "optional": false,
        "name": "demo.RESCUE.Value"
    },
    "payload": {
        "codid": 0.0,
        "__op": "r",
        "__deleted": "false"
    }
}

这是我的路线:

public class Routes extends RouteBuilder{

    public static class MySplitter {
        public List<Payload> splitBody(Rescue data) {
            return data.getPayload().stream().collect(toList());
        }
    }

    @Override
    public void configure() throws Exception {

        from("kafka:{{kafka.source.topic.name}}?brokers={{kafka.bootstrap.address}}&autoOffsetReset=earliest")
        .log("Received body: ${body}")
            .unmarshal().json(JsonLibrary.Jackson, Rescue.class)
            .split().method(MySplitter.class, "splitBody")
            .marshal().json(JsonLibrary.Jackson)
            .convertBodyTo(String.class)
            .log("Output: ${body}");
    }
    
}

收到错误:

com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.ArrayList<org.demo.pojos.rescue.Payload>` from Object value (token `JsonToken.START_OBJECT`)

如果你只对payload感兴趣,你必须从整个JSON中提取这个对象。例如 JSONPath.

Camel 支持 JSONPath as expression language。因此你可以尝试像

.log("Received body: ${body}")  // logs the full JSON
.setBody().jsonpathWriteAsString("$.payload")
.log("Reduced body: ${body}")   // should log the new body (only the payload)
...

请注意,您需要添加 camel-jsonpath 依赖项

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-jsonpath</artifactId>
</dependency>

或者如果你使用 SpringBoot

<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-jsonpath-starter</artifactId>
</dependency>