Kafka JDBC 接收器连接器未访问正确的连接器配置
Kafka JDBC sink connector does not access the correct connector configurations
我在 docker 容器中的 pi 上有一个 postgres 运行。
Debezium 连接器在我的本地机器上是 运行(与 zookeeper 和 kafka 相同)。
kafka 主题已启动,运行 我可以看到我在 postgres 中所做的更改进入 kafka 主题。到目前为止一切顺利。
现在我在本地启动了另一个 docker 容器,它与我的其他容器不是来自同一个 docker 组合文件。这应该是我的副本数据库。
我将 confluentinc-kafka-connect-jdbc-10.5.0 复制到 docker 容器中。
sudo docker cp confluentinc-kafka-connect-jdbc-10.5.0 CONTAINER_ID:/kafka/connect/
更改了用户和用户组并重新声明了容器。
docker exec -it --user root <container-id> /bin/bash
chown -R <username>:<groupname> <folder/file>
现在我创建了 jdbc-sink 连接器。
curl --location --request POST 'http://localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "piserver.public.customers",
"connection.url": "jdbc:postgresql:192.168.128.2:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
'
我找回了创建的 201。
我得到的错误是在 运行 几秒后:
curl --location --request GET 'localhost:8083/connectors/jdbc-sink/status' \
--data-raw ''
错误跟踪
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.112.4:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:59)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\t... 10 more\nCaused by: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)\n\tat org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)\n\tat org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)\n\tat org.postgresql.Driver.makeConnection(Driver.java:400)\n\tat org.postgresql.Driver.connect(Driver.java:259)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250)\n\tat io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getConnection(PostgreSqlDatabaseDialect.java:103)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)\n\t... 13 more\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)\n\tat java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.postgresql.core.PGStream.createSocket(PGStream.java:241)\n\tat org.postgresql.core.PGStream.<init>(PGStream.java:98)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:109)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:235)\n\t... 23 more\n"
}
简短:
Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
我在配置中尝试的主机:
"connection.url": "jdbc:postgresql:192.168.128.2:5432/postgres" // got this IP from docker inspect POSTGRES_CONTAINER
"connection.url": "jdbc:postgresql:host.docker.internal:5432/postgres"
"connection.url": "jdbc:postgresql:localhost:5432/postgres"
None 其中有效
总是遇到同样的错误,无法访问 localhost:5432。
还尝试将 docker postgres 容器(副本)连接到我的 docker-compose 网络。
对此有任何想法谢谢。
小简历。
POSTGRES(在 PI 上)->DEBEZIUM 连接器(本地)-->KAFKA-> JDBC-KAFKA 中的 SINK -> POSTGRES(将是副本,在本地运行)
不要在容器之间使用 IP 地址,也不要在容器内使用本地主机来尝试访问其他容器 - https://docs.docker.com/network/bridge/
最好使用DockerCompose启动所有服务,否则需要自己创建网桥
docker network create database-bridge
docker run --network=database-bridge --name=postgres ...
docker run --network=database-bridge ... # repeat for zookeeper, kafka, and debezium
或者查看创建的网络,并将新容器附加到该网络,因为你说
started another docker container locally which is not from the same docker compose file
docker network ls # look for a name that matches the folder where you ran docker-compose
docker run --network=<name> ... jdbc-connector
然后使用 jdbc:postgresql://postgres:5432/postgres
通过主机名连接到该容器。
如果 JDBC 连接器是 运行 connect-distributed.sh
而不是 Docker,只有这样你才能使用 localhost:5432
,但你需要一个来自Postgres 容器到主机。
我在 docker 容器中的 pi 上有一个 postgres 运行。 Debezium 连接器在我的本地机器上是 运行(与 zookeeper 和 kafka 相同)。
kafka 主题已启动,运行 我可以看到我在 postgres 中所做的更改进入 kafka 主题。到目前为止一切顺利。
现在我在本地启动了另一个 docker 容器,它与我的其他容器不是来自同一个 docker 组合文件。这应该是我的副本数据库。
我将 confluentinc-kafka-connect-jdbc-10.5.0 复制到 docker 容器中。
sudo docker cp confluentinc-kafka-connect-jdbc-10.5.0 CONTAINER_ID:/kafka/connect/
更改了用户和用户组并重新声明了容器。
docker exec -it --user root <container-id> /bin/bash
chown -R <username>:<groupname> <folder/file>
现在我创建了 jdbc-sink 连接器。
curl --location --request POST 'http://localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "piserver.public.customers",
"connection.url": "jdbc:postgresql:192.168.128.2:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
'
我找回了创建的 201。
我得到的错误是在 运行 几秒后:
curl --location --request GET 'localhost:8083/connectors/jdbc-sink/status' \
--data-raw ''
错误跟踪
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.112.4:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:59)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\t... 10 more\nCaused by: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)\n\tat org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)\n\tat org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)\n\tat org.postgresql.Driver.makeConnection(Driver.java:400)\n\tat org.postgresql.Driver.connect(Driver.java:259)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250)\n\tat io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getConnection(PostgreSqlDatabaseDialect.java:103)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)\n\t... 13 more\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)\n\tat java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.postgresql.core.PGStream.createSocket(PGStream.java:241)\n\tat org.postgresql.core.PGStream.<init>(PGStream.java:98)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:109)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:235)\n\t... 23 more\n"
}
简短:
Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
我在配置中尝试的主机:
"connection.url": "jdbc:postgresql:192.168.128.2:5432/postgres" // got this IP from docker inspect POSTGRES_CONTAINER
"connection.url": "jdbc:postgresql:host.docker.internal:5432/postgres"
"connection.url": "jdbc:postgresql:localhost:5432/postgres"
None 其中有效
总是遇到同样的错误,无法访问 localhost:5432。
还尝试将 docker postgres 容器(副本)连接到我的 docker-compose 网络。
对此有任何想法谢谢。
小简历。
POSTGRES(在 PI 上)->DEBEZIUM 连接器(本地)-->KAFKA-> JDBC-KAFKA 中的 SINK -> POSTGRES(将是副本,在本地运行)
不要在容器之间使用 IP 地址,也不要在容器内使用本地主机来尝试访问其他容器 - https://docs.docker.com/network/bridge/
最好使用DockerCompose启动所有服务,否则需要自己创建网桥
docker network create database-bridge
docker run --network=database-bridge --name=postgres ...
docker run --network=database-bridge ... # repeat for zookeeper, kafka, and debezium
或者查看创建的网络,并将新容器附加到该网络,因为你说
started another docker container locally which is not from the same docker compose file
docker network ls # look for a name that matches the folder where you ran docker-compose
docker run --network=<name> ... jdbc-connector
然后使用 jdbc:postgresql://postgres:5432/postgres
通过主机名连接到该容器。
如果 JDBC 连接器是 运行 connect-distributed.sh
而不是 Docker,只有这样你才能使用 localhost:5432
,但你需要一个来自Postgres 容器到主机。