Kafka Connect - JDBC 接收器连接器 - 尝试将 AvroConverter 架构注册表详细信息作为新列添加到目标 table 中

Kafka Connect - JDBC Sink connector - tries to add AvroConverter Schema Registry details into target table as new columns

我正在尝试将数据从 Kafka 集群加载到 Oracle 数据库中。集群中的数据已使用 AvroConverter 加载,模式详细信息还包含源、操作、ts_ms 等字段。我的问题是,当我尝试从集群读取数据时,它会尝试添加这些模式字段作为附加列进入我的目标 table 并抛出错误:

 INFO Unable to find fields [SinkRecordField{schema=Schema{Tab1.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{Tab1.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}] among column names [col1, col2, col3, col4] (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-10 17:18:04,181] ERROR WorkerSinkTask{id=jdbc-conn} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "Tab1" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "Tab1" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:182)

我将在下面放置我的连接器详细信息(运行 在 docker 容器中,为清楚起见删除了连接字符串):

{
  "name": "jdbc-conn",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "name": "jdbc-conn",
    "tasks.max": "1",
    "topics.regex": "customer.*",
    "transforms": "changeTopicName",
    "transforms.changeTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.changeTopicName.regex": "customer.(.*)",
    "transforms.changeTopicName.replacement": "",
    "connection.url": "jdbc:oracle:thin:user/pass@localhost:1521:customer",
    "connection.user": "user",
    "connection.password": "pass",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": ""
  },
  "tasks": [],
  "type": "sink"
}

Docker 容器

FROM confluentinc/cp-kafka-connect-base:latest

RUN   confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest

初始化为

docker run -d \
  --name=connect \
  --net=host \
  -e CONNECT_BOOTSTRAP_SERVERS="localhost:9092" \
  -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_CONSUMER_BOOTSTRAP_SERVERS="localhost:9092" \
  -e CONNECT_PRODUCER_BOOTSTRAP_SERVERS="localhost:9092" \
  -e CONNECT_GROUP_ID="sink_group1" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="connect-sink-offsets" \
  -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR="3" \
  -e CONNECT_OFFSET_STORAGE_PARTITIONS="3" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="connect-sink-configs" \
  -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR="3" \
  -e CONNECT_STATUS_STORAGE_TOPIC="connect-sink-status" \
  -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR="3" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="xxx" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="yyy" \
  -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="xxx" \
  -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="yyy" \
  -e CONNECT_CONFLUENT_TOPIC_BOOTSTRAP_SERVERS="localhost:9092" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL="INFO" \
  -e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components \
  -v /home/cdcadmin/oracle_dest_connector/mounted/jars:/etc/kafka-connect/jars \
  jdbc-conn:latest

我根本不想添加这些字段。 我在这里错过了什么?

the schema details contains also fields like source, op, ts_ms etc

您需要安装和配置 Debezium ExtractNewState 转换以删除它们。

https://debezium.io/documentation/reference/stable/transformations/event-flattening.html

OneCricketeer 的答案和评论供使用 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html

问题已解决。我缺少的是 debezium-core 正确安装。我希望 ExtractNewRecordState 是 confluentinc/kafka-connect-jdbc 的一部分,但它不是。由于我需要坚持使用 confluent-hub,因此我决定安装 debezium/debezium-connector-mysql(confluent-hub 上没有可用的 Debezium Oracle 连接器)。 我的 Dockerfile 现在是:

FROM confluentinc/cp-kafka-connect-base:latest

RUN   confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
RUN   confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest

最后日志说:

 INFO Added plugin 'io.debezium.transforms.ExtractNewRecordState' 

“无法找到字段”错误已解决。

现在又中了一个:) “错误:不支持的源数据类型:STRUCT”但现在这是一个不同的问题。再次感谢。