Kafka Connect - JDBC 接收器连接器 - 尝试将 AvroConverter 架构注册表详细信息作为新列添加到目标 table 中
Kafka Connect - JDBC Sink connector - tries to add AvroConverter Schema Registry details into target table as new columns
我正在尝试将数据从 Kafka 集群加载到 Oracle 数据库中。集群中的数据已使用 AvroConverter 加载,模式详细信息还包含源、操作、ts_ms 等字段。我的问题是,当我尝试从集群读取数据时,它会尝试添加这些模式字段作为附加列进入我的目标 table 并抛出错误:
INFO Unable to find fields [SinkRecordField{schema=Schema{Tab1.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{Tab1.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}] among column names [col1, col2, col3, col4] (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-10 17:18:04,181] ERROR WorkerSinkTask{id=jdbc-conn} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "Tab1" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "Tab1" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:182)
我将在下面放置我的连接器详细信息(运行 在 docker 容器中,为清楚起见删除了连接字符串):
{
"name": "jdbc-conn",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"name": "jdbc-conn",
"tasks.max": "1",
"topics.regex": "customer.*",
"transforms": "changeTopicName",
"transforms.changeTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopicName.regex": "customer.(.*)",
"transforms.changeTopicName.replacement": "",
"connection.url": "jdbc:oracle:thin:user/pass@localhost:1521:customer",
"connection.user": "user",
"connection.password": "pass",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": ""
},
"tasks": [],
"type": "sink"
}
Docker 容器
FROM confluentinc/cp-kafka-connect-base:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
初始化为
docker run -d \
--name=connect \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS="localhost:9092" \
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_CONSUMER_BOOTSTRAP_SERVERS="localhost:9092" \
-e CONNECT_PRODUCER_BOOTSTRAP_SERVERS="localhost:9092" \
-e CONNECT_GROUP_ID="sink_group1" \
-e CONNECT_OFFSET_STORAGE_TOPIC="connect-sink-offsets" \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR="3" \
-e CONNECT_OFFSET_STORAGE_PARTITIONS="3" \
-e CONNECT_CONFIG_STORAGE_TOPIC="connect-sink-configs" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR="3" \
-e CONNECT_STATUS_STORAGE_TOPIC="connect-sink-status" \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR="3" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="xxx" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="yyy" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="xxx" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="yyy" \
-e CONNECT_CONFLUENT_TOPIC_BOOTSTRAP_SERVERS="localhost:9092" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL="INFO" \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components \
-v /home/cdcadmin/oracle_dest_connector/mounted/jars:/etc/kafka-connect/jars \
jdbc-conn:latest
我根本不想添加这些字段。
我在这里错过了什么?
the schema details contains also fields like source, op, ts_ms etc
您需要安装和配置 Debezium ExtractNewState 转换以删除它们。
https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
OneCricketeer 的答案和评论供使用
https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
问题已解决。我缺少的是 debezium-core 正确安装。我希望 ExtractNewRecordState 是 confluentinc/kafka-connect-jdbc 的一部分,但它不是。由于我需要坚持使用 confluent-hub,因此我决定安装 debezium/debezium-connector-mysql(confluent-hub 上没有可用的 Debezium Oracle 连接器)。
我的 Dockerfile 现在是:
FROM confluentinc/cp-kafka-connect-base:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
RUN confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
最后日志说:
INFO Added plugin 'io.debezium.transforms.ExtractNewRecordState'
“无法找到字段”错误已解决。
现在又中了一个:)
“错误:不支持的源数据类型:STRUCT”但现在这是一个不同的问题。再次感谢。
我正在尝试将数据从 Kafka 集群加载到 Oracle 数据库中。集群中的数据已使用 AvroConverter 加载,模式详细信息还包含源、操作、ts_ms 等字段。我的问题是,当我尝试从集群读取数据时,它会尝试添加这些模式字段作为附加列进入我的目标 table 并抛出错误:
INFO Unable to find fields [SinkRecordField{schema=Schema{Tab1.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{Tab1.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}] among column names [col1, col2, col3, col4] (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-10 17:18:04,181] ERROR WorkerSinkTask{id=jdbc-conn} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "Tab1" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "Tab1" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:182)
我将在下面放置我的连接器详细信息(运行 在 docker 容器中,为清楚起见删除了连接字符串):
{
"name": "jdbc-conn",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"name": "jdbc-conn",
"tasks.max": "1",
"topics.regex": "customer.*",
"transforms": "changeTopicName",
"transforms.changeTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopicName.regex": "customer.(.*)",
"transforms.changeTopicName.replacement": "",
"connection.url": "jdbc:oracle:thin:user/pass@localhost:1521:customer",
"connection.user": "user",
"connection.password": "pass",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": ""
},
"tasks": [],
"type": "sink"
}
Docker 容器
FROM confluentinc/cp-kafka-connect-base:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
初始化为
docker run -d \
--name=connect \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS="localhost:9092" \
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_CONSUMER_BOOTSTRAP_SERVERS="localhost:9092" \
-e CONNECT_PRODUCER_BOOTSTRAP_SERVERS="localhost:9092" \
-e CONNECT_GROUP_ID="sink_group1" \
-e CONNECT_OFFSET_STORAGE_TOPIC="connect-sink-offsets" \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR="3" \
-e CONNECT_OFFSET_STORAGE_PARTITIONS="3" \
-e CONNECT_CONFIG_STORAGE_TOPIC="connect-sink-configs" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR="3" \
-e CONNECT_STATUS_STORAGE_TOPIC="connect-sink-status" \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR="3" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="xxx" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="yyy" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="xxx" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="yyy" \
-e CONNECT_CONFLUENT_TOPIC_BOOTSTRAP_SERVERS="localhost:9092" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL="INFO" \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components \
-v /home/cdcadmin/oracle_dest_connector/mounted/jars:/etc/kafka-connect/jars \
jdbc-conn:latest
我根本不想添加这些字段。 我在这里错过了什么?
the schema details contains also fields like source, op, ts_ms etc
您需要安装和配置 Debezium ExtractNewState 转换以删除它们。
https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
OneCricketeer 的答案和评论供使用 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
问题已解决。我缺少的是 debezium-core 正确安装。我希望 ExtractNewRecordState 是 confluentinc/kafka-connect-jdbc 的一部分,但它不是。由于我需要坚持使用 confluent-hub,因此我决定安装 debezium/debezium-connector-mysql(confluent-hub 上没有可用的 Debezium Oracle 连接器)。 我的 Dockerfile 现在是:
FROM confluentinc/cp-kafka-connect-base:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
RUN confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
最后日志说:
INFO Added plugin 'io.debezium.transforms.ExtractNewRecordState'
“无法找到字段”错误已解决。
现在又中了一个:) “错误:不支持的源数据类型:STRUCT”但现在这是一个不同的问题。再次感谢。