Kafka JDBC 接收器连接器未访问正确的连接器配置

Kafka JDBC sink connector does not access the correct connector configurations

我在 docker 容器中的 pi 上有一个 postgres 运行。 Debezium 连接器在我的本地机器上是 运行(与 zookeeper 和 kafka 相同)。

kafka 主题已启动,运行 我可以看到我在 postgres 中所做的更改进入 kafka 主题。到目前为止一切顺利。

现在我在本地启动了另一个 docker 容器,它与我的其他容器不是来自同一个 docker 组合文件。这应该是我的副本数据库。

我将 confluentinc-kafka-connect-jdbc-10.5.0 复制到 docker 容器中。

sudo docker cp confluentinc-kafka-connect-jdbc-10.5.0 CONTAINER_ID:/kafka/connect/ 

更改了用户和用户组并重新声明了容器。

docker exec -it --user root <container-id> /bin/bash
chown -R <username>:<groupname> <folder/file>

现在我创建了 jdbc-sink 连接器。

curl --location --request POST 'http://localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "piserver.public.customers",
        "connection.url": "jdbc:postgresql:192.168.128.2:5432/postgres",
        "connection.user": "postgres",
        "connection.password": "postgres",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",   
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"                                               
    }
}
'

我找回了创建的 201。

我得到的错误是在 运行 几秒后:

curl --location --request GET 'localhost:8083/connectors/jdbc-sink/status' \
--data-raw ''

错误跟踪

{
            "id": 0,
            "state": "FAILED",
            "worker_id": "192.168.112.4:8083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:59)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\t... 10 more\nCaused by: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)\n\tat org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)\n\tat org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)\n\tat org.postgresql.Driver.makeConnection(Driver.java:400)\n\tat org.postgresql.Driver.connect(Driver.java:259)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250)\n\tat io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getConnection(PostgreSqlDatabaseDialect.java:103)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)\n\t... 13 more\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)\n\tat java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.postgresql.core.PGStream.createSocket(PGStream.java:241)\n\tat org.postgresql.core.PGStream.<init>(PGStream.java:98)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:109)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:235)\n\t... 23 more\n"
        }

简短: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.

我在配置中尝试的主机:

"connection.url": "jdbc:postgresql:192.168.128.2:5432/postgres" // got this IP from docker inspect POSTGRES_CONTAINER
"connection.url": "jdbc:postgresql:host.docker.internal:5432/postgres" 
"connection.url": "jdbc:postgresql:localhost:5432/postgres" 

None 其中有效

总是遇到同样的错误,无法访问 localhost:5432。

还尝试将 docker postgres 容器(副本)连接到我的 docker-compose 网络。

对此有任何想法谢谢。

小简历。

POSTGRES(在 PI 上)->DEBEZIUM 连接器(本地)-->KAFKA-> JDBC-KAFKA 中的 SINK -> POSTGRES(将是副本,在本地运行)

不要在容器之间使用 IP 地址,也不要在容器内使用本地主机来尝试访问其他容器 - https://docs.docker.com/network/bridge/

最好使用DockerCompose启动所有服务,否则需要自己创建网桥

docker network create database-bridge
docker run --network=database-bridge --name=postgres ...  
docker run --network=database-bridge  ... # repeat for zookeeper, kafka, and debezium

或者查看创建的网络,并将新容器附加到该网络,因为你说

started another docker container locally which is not from the same docker compose file

docker network ls  # look for a name that matches the folder where you ran docker-compose
docker run --network=<name> ... jdbc-connector

然后使用 jdbc:postgresql://postgres:5432/postgres 通过主机名连接到该容器。

如果 JDBC 连接器是 运行 connect-distributed.sh 而不是 Docker,只有这样你才能使用 localhost:5432,但你需要一个来自Postgres 容器到主机。