Kafka Debezium 连接器工作但不将数据摄取到 PostgreSQL

Kafka Debezium Connector Working But not ingesting data into PostgreSQL

我已经使用 debezium-connector-postgres-1.8 成功地将 postgresql 连接到 kafka。0.Final-plugin connector.Below 是我的 Standalone.properties 文件:

rest.port=8084
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/home/abc/debezium-connector-postgres-1.8.0.Final-plugin/debezium-connector-postgres/debezium-connector-postgres-1.8.0.Final.jar,


############################# Zookeeper #############################


zookeeper.connect=localhost:2181

我的连接器文件如下:

name= inventory_db-connector
connector.class= io.debezium.connector.postgresql.PostgresConnector
tasks.max= 1
database.hostname= localhost
database.port= 5432
database.user= user
database.password= pass
database.dbname = testdb
database.server.name= dbserver1
database.whitelist= testdb
database.history.kafka.bootstrap.servers= kafka=9092
database.history.kafka.topic= schema-changes.inventory
topics=inventory
topic.inventory.inventory.conditions.mapping=time=value.time,device_id=value.device_id,temperature=value.temperature,humidity=value.humidity 

通过以下命令在终端运行成功kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties /home/abc/inventory_db-connector.

我的 inventory.txt 包含我想以 JSON 格式在数据库中摄取的数据。 我还检查了 kafka-consumer 的库存主题,它显示数据已加载:

{"time":"2022-02-08 16:18:00-05", "device_id":"weather-pro001000", "temperature":"200", "humidity":"200"}

现在我们正尝试使用以下命令将数据提取到 postgres 中 cat inventory.txt | kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"; 但是现在的问题是数据没有插入到数据库中,命令执行了 没有显示任何错误,我的日志也是空的。 我似乎无法弄清楚这个问题。

Debezium 连接器是一个源连接器,即它用于将数据从 postgresSQL 读取到 kafka。

如果您想将数据导入 postgres sql 尝试使用 JDBC 接收器连接器

https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html