Kafka 消息包含控制字符(MongoDB 源连接器)

Kafka message includes control characters (MongoDB Source Connector)

我有一个 Kafka Connect MongoDB Source Connector(都通过 Confluent Platform)工作,但它创建的消息在开头包含一个控制字符,这使得下游解析(到 JSON ) 比我想象的要难得多。

源连接器 运行:

{
    "name": "mongo-source-connector",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "connection.uri": "mongodb://myUsername:myPassword@my-mongodb-host-address:27017",
        "database": "myDatabase",
        "collection": "myCollection",
        "change.stream.full.document": "updateLookup",
        "errors.log.enable": true
    }
}

此源连接器在 Kafka 主题中创建的消息如下(注意前导控制字符):

�{"_id": {"_data": "82609E8726000000012B022C0100296E5A1004BE208B099BCF4106822DE274B0B9D39A46645F69640064609E87267125D17D12D180620004"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1621002022, "i": 1}}, "fullDocument": {"_id": {"$oid": "609e87267125d17d12d18062"}, "uuid": "23534a5c-ad82-431c-a821-6b4aed4f59a1", "endingNumber": 10}, "ns": {"db": "myDatabase", "coll": "myCollection"}, "documentKey": {"_id": {"$oid": "609e87267125d17d12d18062"}}}

控制字符使下游解析到 JSON 变得困难,因为它使原本有效的 JSON 无效。我不知道它为什么在那里或如何摆脱它。

我想,在像 JSON 一样对待它之前,我可以解析出像这个控制字符这样的垃圾,但这似乎是我想避免的创可贴。

我现在处理消息的方式,我认为这是无关紧要的,因为我已经测试过它可以在没有控制字符的情况下使用有效的 JSON,如果重要的话,如下所示:


data class MyChangesetMessageId (
    @JsonProperty("_data")
    val data: String
)

data class MyChangesetMessageTimestamp (
    val t: Long,
    val i: Int
)

data class MyChangesetMessageClusterTime (
    @JsonProperty("$timestamp")
    val timestamp: MyChangesetMessageTimestamp
)

data class MyChangesetOid (
    @JsonProperty("$oid")
    val oid: String
)

data class MyChangesetMessageFullDocument (
    @JsonProperty("_id")
    val id: MyChangesetOid,
    val uuid: String,
    val endingNumber: Int
)

data class MyChangesetMessageNS (
    val db: String,
    val coll: String
)

data class MyChangesetDocumentKey (
    @JsonProperty("_id")
    val id: MyChangesetOid
)

data class MyChangesetMessage (
    @JsonProperty("_id")
    val id: MyChangesetMessageId,
    val operationType: String,
    val clusterTime: MyChangesetMessageClusterTime,
    val fullDocument: MyChangesetMessageFullDocument,
    val ns: MyChangesetMessageNS,
    val documentKey: MyChangesetDocumentKey
)

...

val objectMapper = jacksonObjectMapper()
val changesetMessage = objectMapper.readValue(message, MyChangesetMessage::class.java)

欢迎任何想法。

您所指的字符通常与已解码为字符串的 Avro 序列化数据相同。

检查 Connect worker 中的 key/value 转换器设置,因为您尚未在连接器中定义它。

如果您想解析为 JSON,请使用 JSON 转换器,否则如果您想跳过数据 class 定义并从 Avro 生成数据,Avro 也可以正常工作架构