Confluent JDBC Sink 连接器无法识别 debezium 连接器捕获的记录

Confluent JDBC Sink Connector Can't recognize record captured by debezium connector

我尝试使用 kafka echo 系统对 postgresql 进行 cdc。 debezium 连接器捕获 postgresql 13 数据,然后生成到 kafka。我将汇合的 JDBC 接收器连接器连接到卡夫卡的另一侧,但它无法正确识别 json 消息。

测试环境:

已测试 table。

create table PERSONS (id integer not null, name varchar(50) not null, nickname varchar(50), primary key(id));

Debezium 在更新事件中生成 json 消息。 (由控制台消费者捕获)

{

"schema":{
    "type":"struct",
    "fields":[
        {
            "type":"struct",
            "fields":[
                {
                    "type":"int32",
                    "optional":false,
                    "field":"id"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"name"
                },
                {
                    "type":"string",
                    "optional":true,
                    "field":"nickname"
                }
            ],
            "optional":true,
            "name":"localdb_postgres.public.persons.Value",
            "field":"before"
        },
        {
            "type":"struct",
            "fields":[
                {
                    "type":"int32",
                    "optional":false,
                    "field":"id"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"name"
                },
                {
                    "type":"string",
                    "optional":true,
                    "field":"nickname"
                }
            ],
            "optional":true,
            "name":"localdb_postgres.public.persons.Value",
            "field":"after"
        },
        {
            "type":"struct",
            "fields":[
                {
                    "type":"string",
                    "optional":false,
                    "field":"version"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"connector"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"name"
                },
                {
                    "type":"int64",
                    "optional":false,
                    "field":"ts_ms"
                },
                {
                    "type":"string",
                    "optional":true,
                    "name":"io.debezium.data.Enum",
                    "version":1,
                    "parameters":{
                        "allowed":"true,last,false,incremental"
                    },
                    "default":"false",
                    "field":"snapshot"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"db"
                },
                {
                    "type":"string",
                    "optional":true,
                    "field":"sequence"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"schema"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"table"
                },
                {
                    "type":"int64",
                    "optional":true,
                    "field":"txId"
                },
                {
                    "type":"int64",
                    "optional":true,
                    "field":"lsn"
                },
                {
                    "type":"int64",
                    "optional":true,
                    "field":"xmin"
                }
            ],
            "optional":false,
            "name":"io.debezium.connector.postgresql.Source",
            "field":"source"
        },
        {
            "type":"string",
            "optional":false,
            "field":"op"
        },
        {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
        },
        {
            "type":"struct",
            "fields":[
                {
                    "type":"string",
                    "optional":false,
                    "field":"id"
                },
                {
                    "type":"int64",
                    "optional":false,
                    "field":"total_order"
                },
                {
                    "type":"int64",
                    "optional":false,
                    "field":"data_collection_order"
                }
            ],
            "optional":true,
            "field":"transaction"
        }
    ],
    "optional":false,
    "name":"localdb_postgres.public.persons.Envelope"
},
"payload":{
    "before":null,
    "after":{
        "id":3,
        "name":"Ko Youngrok",
        "nickname":"SuperMen"
    },
    "source":{
        "version":"1.9.2.Final",
        "connector":"postgresql",
        "name":"localdb_postgres",
        "ts_ms":1653878655898,
        "snapshot":"false",
        "db":"postgres",
        "sequence":"[\"23092016\",\"23092304\"]",
        "schema":"public",
        "table":"persons",
        "txId":516,
        "lsn":23092304,
        "xmin":null
    },
    "op":"u",
    "ts_ms":1653878656372,
    "transaction":null
}

来自 Confluent jdbc 接收器连接器的错误日志。

[2022-05-30 15:45:26,750] INFO Unable to find fields [SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}, SinkRecordField{schema=Schema{localdb_postgres.public.persons.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{localdb_postgres.public.persons.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}] among column names [nickname, id, name] (io.confluent.connect.jdbc.sink.DbStructure:276)

[2022-05-30 15:45:26,752] ERROR WorkerSinkTask{id=conflue-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "localdb_postgres"."public"."persons" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:608) io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "localdb_postgres"."public"."persons" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value

我想,

有解决这个问题的办法吗?

这是合流连接器配置。

name=conflue-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=blahblah
connection.user=blahblah
connection.password=blahblah
dialect.name=PostgreSqlDatabaseDialect
insert.mode=insert
pk.mode=none
pk.fields=none
batch.size=3000
delete.enabled=false
topics=localdb_postgres.public.persons
Unable to find fields ... source, ts_ms, after, op, before, transaction ...   among column names [nickname, id, name]

JDBC Sink Connect 仅推断其使用的记录的顶级字段

如果您只想写入 after 数据负载,例如,您需要 extract it。如果您继续使用 JSONConverter,那么您仍然会得到 schemapayload,但您只会看到与该数据相关的内容。

例如这将是转换后的完整记录。

    "schema" : {
        "type":"struct",
        "fields":[
            {
                "type":"int32",
                "optional":false,
                "field":"id"
            },
            {
                "type":"string",
                "optional":false,
                "field":"name"
            },
            {
                "type":"string",
                "optional":true,
                "field":"nickname"
            }
        ],
        "name":"localdb_postgres.public.persons.Value"
    }, 
    "payload" : {
        "id":3,
        "name":"Ko Youngrok",
        "nickname":"SuperMen"
    }