使用 kafka 主题创建 table 时,表需要主键
Tables require a primary key when create a table with kafka topic
我有一个 mysql table 是这样的:
我使用 kafka 连接器将此 table 添加到 kafka 主题:
ksql> CREATE SOURCE CONNECTOR SOURCE_MYSQL_01 WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'debezium',
'database.password' = 'dbz',
'database.server.id' = '42',
'database.server.name' = 'asgard',
'table.whitelist' = 'demo.customers',
'database.history.kafka.bootstrap.servers' = 'kafka:29092',
'database.history.kafka.topic' = 'dbhistory.demo' ,
'include.schema.changes' = 'false',
'transforms'= 'unwrap,extractkey',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);
然后我想用它来创建一个基于它的 table:
CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
然后我得到这个错误:
Tables require a PRIMARY KEY. Please define the PRIMARY KEY.
Use a partial schema to define the primary key and still load the value columns from the Schema Registry, for example:
CREATE TABLE CUSTOMERS (ID INT PRIMARY KEY) WITH (...);
当我按照建议更改时:
CREATE TABLE CUSTOMERS (id int primary key) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
我收到这个错误:
Duplicate column names: `ID`
做了一些搜索,但仍然卡在这里。这有什么问题,我该如何解决?谢谢
问题是您分配给 PK 列的名称与从架构注册表加载的 Avro 架构中的列名称冲突。
您可以随意命名您的键列,因为列名不会在任何地方保留,因此只需将其命名为不冲突的名称即可,例如customer_id
.
CREATE TABLE CUSTOMERS (customer_id int primary key) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
还注意到您有:
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter'
我相信这会将密钥转换为 STRING
。不确定这是故意的。 ksqlDB 也将与 INT
PK 一起工作...
我有一个 mysql table 是这样的:
ksql> CREATE SOURCE CONNECTOR SOURCE_MYSQL_01 WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'debezium',
'database.password' = 'dbz',
'database.server.id' = '42',
'database.server.name' = 'asgard',
'table.whitelist' = 'demo.customers',
'database.history.kafka.bootstrap.servers' = 'kafka:29092',
'database.history.kafka.topic' = 'dbhistory.demo' ,
'include.schema.changes' = 'false',
'transforms'= 'unwrap,extractkey',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);
然后我想用它来创建一个基于它的 table:
CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
然后我得到这个错误:
Tables require a PRIMARY KEY. Please define the PRIMARY KEY.
Use a partial schema to define the primary key and still load the value columns from the Schema Registry, for example:
CREATE TABLE CUSTOMERS (ID INT PRIMARY KEY) WITH (...);
当我按照建议更改时:
CREATE TABLE CUSTOMERS (id int primary key) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
我收到这个错误:
Duplicate column names: `ID`
做了一些搜索,但仍然卡在这里。这有什么问题,我该如何解决?谢谢
问题是您分配给 PK 列的名称与从架构注册表加载的 Avro 架构中的列名称冲突。
您可以随意命名您的键列,因为列名不会在任何地方保留,因此只需将其命名为不冲突的名称即可,例如customer_id
.
CREATE TABLE CUSTOMERS (customer_id int primary key) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
还注意到您有:
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter'
我相信这会将密钥转换为 STRING
。不确定这是故意的。 ksqlDB 也将与 INT
PK 一起工作...