如何将带有大写键的 json 数据下沉到 Postgres table 中?
How to sink json data with keys in Uppercase into Postgres table?
假设我有一个主题(用户)有 json 数据但没有架构。
数据示例:
{"id":3151212170,"name":"John Wick"}
为了解决这个问题,我创建了一个流 (user_stream) 来从主题中获取数据并基本上为它提供一个模式。
create stream user_stream (id bigint, name string) with (kafka_topic='user', value_format='JSON', key = 'id');
然后为了使用数据我创建了另一个流:
create stream user_final with (value_format = 'AVRO') as select * from USER_STREAM;
注意:数据现在采用带有架构的 Avro 格式,但列现在采用大写形式。
我正在使用 Kafka 的 JdbcSinkConnector 将数据汇入现有的 Postgres Table。
Postgres Table 示例:
create table mytable (id bigint primary key, name text)
接收器连接器配置:
{
"name": "postgres-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "USER_FINAL",
"key.converter.schema.registry.url": "http://schema-reg-url:8081",
"value.converter.schema.registry.url": "http://schema-reg-url:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"connection.url": "jdbc:postgresql://postgres-url:5432/mydbname?user=username&password=password",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"table.name.format": "mytable",
"pk.mode": "none",
"insert.mode": "insert"
}
}
问题是流列名是大写,而Postgres table的列名是小写 ].即使我在 Postgres 中创建一个带有大写列名的 table,它也会转换为小写。
错误:
org.postgresql.util.PSQLException: ERROR: column "ID" of relation "mytable" does not exist
有解决办法吗?我愿意接受建议。
Even if I create a table in Postgres with uppercase column names it
just converts to Lowercase.
如果您的列名没有放在双引号中,PostgreSQL 会将它们转换为小写格式。
因此,
- 引号使列名区分大小写
- 而未加引号的列名总是折叠成小写
在 Kafka Connect 端,您可以使用 Kafka Connect Single Message Transofrms (SMT) 更改字段名称。
更准确地说,ReplaceField
允许您重命名字段。例如,以下转换会将列名称 COL1
和 COL2
分别替换为 col1
和 col2
:
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "COL1:col1,COL2:col2"
假设我有一个主题(用户)有 json 数据但没有架构。
数据示例:
{"id":3151212170,"name":"John Wick"}
为了解决这个问题,我创建了一个流 (user_stream) 来从主题中获取数据并基本上为它提供一个模式。
create stream user_stream (id bigint, name string) with (kafka_topic='user', value_format='JSON', key = 'id');
然后为了使用数据我创建了另一个流:
create stream user_final with (value_format = 'AVRO') as select * from USER_STREAM;
注意:数据现在采用带有架构的 Avro 格式,但列现在采用大写形式。
我正在使用 Kafka 的 JdbcSinkConnector 将数据汇入现有的 Postgres Table。
Postgres Table 示例:
create table mytable (id bigint primary key, name text)
接收器连接器配置:
{
"name": "postgres-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "USER_FINAL",
"key.converter.schema.registry.url": "http://schema-reg-url:8081",
"value.converter.schema.registry.url": "http://schema-reg-url:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"connection.url": "jdbc:postgresql://postgres-url:5432/mydbname?user=username&password=password",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"table.name.format": "mytable",
"pk.mode": "none",
"insert.mode": "insert"
}
}
问题是流列名是大写,而Postgres table的列名是小写 ].即使我在 Postgres 中创建一个带有大写列名的 table,它也会转换为小写。
错误:
org.postgresql.util.PSQLException: ERROR: column "ID" of relation "mytable" does not exist
有解决办法吗?我愿意接受建议。
Even if I create a table in Postgres with uppercase column names it just converts to Lowercase.
如果您的列名没有放在双引号中,PostgreSQL 会将它们转换为小写格式。
因此,
- 引号使列名区分大小写
- 而未加引号的列名总是折叠成小写
在 Kafka Connect 端,您可以使用 Kafka Connect Single Message Transofrms (SMT) 更改字段名称。
更准确地说,ReplaceField
允许您重命名字段。例如,以下转换会将列名称 COL1
和 COL2
分别替换为 col1
和 col2
:
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "COL1:col1,COL2:col2"