无法使用 debezium 作为源和 confluent jdbc 接收器连接器获取目标数据库中的删除更改

unable to get delete changes in destination db using debezium as source and confluent jdbc sink connector

我用来下载接收器连接器和 mysql jdbc 驱动程序的 Dockerfile

FROM debezium/connect
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MYSQL_DRIVER_VERSION 5.1.39
ARG KAFKA_JDBC_VERSION=5.5.0
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
    | tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
    curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar

我在我当前的工作目录中保留了这个名为 Dockerfile 的没有扩展名的文件,并使用下面的命令来构建 docker 图像

docker build . --tag kafka-connect-sink
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka
docker run -it --rm --name mysql -p 3307:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql


docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h192.168.99.102 -P3307 -uroot -pdebezium'



docker run -it --rm --name mysqldest -p 3308:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql


docker run -it --rm --name mysqltermdest --link mysqldest --rm mysql:5.7 sh -c 'exec mysql -h192.168.99.102 -P3308 -uroot -pdebezium'
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql --link mysqldest:mysqldest kafka-connect-sink
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.test" } }'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{ "name": "inventory-connector-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://mysqldest:3306/inventory?useSSL=false","connection.user": "root", "connection.password": "debezium", "topics": "dbserver1.inventory.customers", "table.name.format": "inventory.customers" ,"auto.create": "true" ,"auto.evolve":"true","delete.enabled":"true", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_key" ,"transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope" } }'

注意:要删除不需要的配置文件,我们可以在下面使用

curl -X DELETE 192.168.99.102:8083/connectors/inventory-connector-sink
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka watch-topic -a -k dbserver1.inventory.customers

一切正常,唯一的问题是,如果我删除源数据库中的任何行,它不会反映在目标数据库中,尽管我使用 delete.enable 为真,pk.mode 为 record_key

请将 https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-drop-tombstones to true or https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-delete-handling-mode 设置为 none

这两个选项中的任何一个都将确保接收器连接器将接收到作为删除指示器的逻辑删除事件。如果不存在,则删除记录被过滤掉。