Kafka Connect - JSON 转换器 - JDBC 接收器连接器 - 列类型 JSON
Kafka Connect - JSON Converter - JDBC Sink Connector - Column Type JSON
用例是将整个消息(即 JSON)和键作为记录存储在 table 中,其中有两列 'id' 和 'data'。
数据库为Postgres,支持列类型为JSON。
根据这篇文章,JSONConverter 中支持的类型是 string、int64 等
https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter
是否可以将数据字段的类型设置为 JSON,然后可以将其存储在 Postgres DB 中,列类型为 JSON。
schema = `{
"type":"struct",
"fields":[
{"type":"string", "optional": false, "field":"id"},
{"type":"string", "optional": false, "field":"data"}
]}`
示例数据负载为
"payload": { "id": 10000, "data": {"hello":"world"} }
以上将数据存储为文本,并期望列在 Postgres 中为文本类型。
如果 Postgres 上的列是 JSON 类型,那么 JDBC 接收器连接器将抛出错误。
在 Postgres 上使用 JSON 类型将有助于在 JSON 字段上创建索引等等。是否可以适当地使用 JSONConverter 和 JDBC Sink Converter 来存储列类型为 JSON.
的记录
使用 value.converter.schema.enable=true
,并像这样发送 JSON 数据(将架构作为每条消息的一部分,并使用实际消息数据更新 payload
部分),然后它应该与 JDBC 接收器一起使用。
{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "struct",
"name": "data",
"optional": false,
"fields": [{
"type": "string",
"name": "hello",
"optional":false
}]
}],
"optional": false,
"name": "foobar"
},
"payload": {
"id": 10000,
"data": {"hello":"world"}
}
}
或者您可以考虑将您的客户端转换为使用 Avro,并为您自己节省一些网络带宽。
JDBC 接收器连接器不支持 PostgreSQL json、jsonb 类型。它支持多种原始类型,日期时间。
在下一页,您可以找到将模式类型映射到数据库类型 (PostgreSQL) https://docs.confluent.io/5.1.0/connect/kafka-connect-jdbc/sink-connector/index.html
尽管 JDBC Source 连接器在某些部分支持 json, jsonb 类型 - 这种类型的列不会映射到 STRUCT
,但将映射到 STRING
类型。
用例是将整个消息(即 JSON)和键作为记录存储在 table 中,其中有两列 'id' 和 'data'。
数据库为Postgres,支持列类型为JSON。
根据这篇文章,JSONConverter 中支持的类型是 string、int64 等 https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter
是否可以将数据字段的类型设置为 JSON,然后可以将其存储在 Postgres DB 中,列类型为 JSON。
schema = `{
"type":"struct",
"fields":[
{"type":"string", "optional": false, "field":"id"},
{"type":"string", "optional": false, "field":"data"}
]}`
示例数据负载为
"payload": { "id": 10000, "data": {"hello":"world"} }
以上将数据存储为文本,并期望列在 Postgres 中为文本类型。 如果 Postgres 上的列是 JSON 类型,那么 JDBC 接收器连接器将抛出错误。
在 Postgres 上使用 JSON 类型将有助于在 JSON 字段上创建索引等等。是否可以适当地使用 JSONConverter 和 JDBC Sink Converter 来存储列类型为 JSON.
的记录使用 value.converter.schema.enable=true
,并像这样发送 JSON 数据(将架构作为每条消息的一部分,并使用实际消息数据更新 payload
部分),然后它应该与 JDBC 接收器一起使用。
{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "struct",
"name": "data",
"optional": false,
"fields": [{
"type": "string",
"name": "hello",
"optional":false
}]
}],
"optional": false,
"name": "foobar"
},
"payload": {
"id": 10000,
"data": {"hello":"world"}
}
}
或者您可以考虑将您的客户端转换为使用 Avro,并为您自己节省一些网络带宽。
JDBC 接收器连接器不支持 PostgreSQL json、jsonb 类型。它支持多种原始类型,日期时间。
在下一页,您可以找到将模式类型映射到数据库类型 (PostgreSQL) https://docs.confluent.io/5.1.0/connect/kafka-connect-jdbc/sink-connector/index.html
尽管 JDBC Source 连接器在某些部分支持 json, jsonb 类型 - 这种类型的列不会映射到 STRUCT
,但将映射到 STRING
类型。