重新创建数据库容器时,Debezium 源任务无法重新连接到 postgresql 数据库
Debezium source task fails to reconnect to postgresql DB when DB container is re-created
我们有一个带有 Debezium 运行 的 kubernetes 集群作为来自 Postgresql 的源任务并写入 kafka。 Debezium、postgres 和 kafka 都 运行 分开 pods。
当删除 postgres pod 并且 kubernetes 重新创建 pod 时,debezium pod 无法重新连接。
来自 debezium pod 的日志:
2018-07-17 08:31:38,311 ERROR || WorkerSourceTask{id=inventory-connector-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
2018-07-17 08:31:38,311 INFO || [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer]
Debezium 继续尝试定期刷新未完成的消息,但出现以下异常:
2018-07-17 08:32:38,167 ERROR || WorkerSourceTask{id=inventory-connector-0} Exception thrown while calling task.commit() [org.apache.kafka.connect.runtime.WorkerSourceTask]
org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy
at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:138)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:437)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:378)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access[=11=]0(SourceTaskOffsetCommitter.java:45)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.run(SourceTaskOffsetCommitter.java:82)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:942)
at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23)
at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:176)
at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:99)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.doFlushLsn(PostgresReplicationConnection.java:246)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.flushLsn(PostgresReplicationConnection.java:239)
at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:146)
... 13 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.postgresql.core.PGStream.flush(PGStream.java:553)
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:939)
... 19 more
有没有办法让 debezium 在可用时重新建立与 postgres 的连接?
还是我缺少一些配置?
- Debezium 版本 0.8
- kubernetes 版本 1.10.3
- postgres 版本 9.6
看起来这是一个常见问题,并且在 debezium 和 kafka 中都有开放的功能请求
https://issues.jboss.org/browse/DBZ-248
https://issues.apache.org/jira/browse/KAFKA-5352
虽然这些是打开的,但看起来这是预期的行为
作为解决方法,我已将此活动探测器添加到部署中
livenessProbe:
exec:
command:
- sh
- -ec
- ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print }' | cut -f1 -d'/'); reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l); if [ $reply -lt 2 ]; then exit 1; fi;
initialDelaySeconds: 30
periodSeconds: 5
第一个子句获取容器 IP 地址:
ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print }' | cut -f1 -d'/');
第二个子句发出请求并计算响应中 'RUNNING' 的实例 json:
reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l);
第三个子句returns如果'RUNNING'出现少于两次则退出代码1
if [ $reply -lt 2 ]; then exit 1; fi
它似乎正在进行初始测试 - 即重新启动 postgres 数据库会触发 debezium 容器的重新启动。我想像这样的脚本(虽然可能 'robustified')可以包含在图像中以方便探测。
我们有一个带有 Debezium 运行 的 kubernetes 集群作为来自 Postgresql 的源任务并写入 kafka。 Debezium、postgres 和 kafka 都 运行 分开 pods。 当删除 postgres pod 并且 kubernetes 重新创建 pod 时,debezium pod 无法重新连接。 来自 debezium pod 的日志:
2018-07-17 08:31:38,311 ERROR || WorkerSourceTask{id=inventory-connector-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
2018-07-17 08:31:38,311 INFO || [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer]
Debezium 继续尝试定期刷新未完成的消息,但出现以下异常:
2018-07-17 08:32:38,167 ERROR || WorkerSourceTask{id=inventory-connector-0} Exception thrown while calling task.commit() [org.apache.kafka.connect.runtime.WorkerSourceTask]
org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy
at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:138)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:437)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:378)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access[=11=]0(SourceTaskOffsetCommitter.java:45)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.run(SourceTaskOffsetCommitter.java:82)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:942)
at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23)
at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:176)
at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:99)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.doFlushLsn(PostgresReplicationConnection.java:246)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.flushLsn(PostgresReplicationConnection.java:239)
at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:146)
... 13 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.postgresql.core.PGStream.flush(PGStream.java:553)
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:939)
... 19 more
有没有办法让 debezium 在可用时重新建立与 postgres 的连接? 还是我缺少一些配置?
- Debezium 版本 0.8
- kubernetes 版本 1.10.3
- postgres 版本 9.6
看起来这是一个常见问题,并且在 debezium 和 kafka 中都有开放的功能请求
https://issues.jboss.org/browse/DBZ-248
https://issues.apache.org/jira/browse/KAFKA-5352
虽然这些是打开的,但看起来这是预期的行为
作为解决方法,我已将此活动探测器添加到部署中
livenessProbe:
exec:
command:
- sh
- -ec
- ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print }' | cut -f1 -d'/'); reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l); if [ $reply -lt 2 ]; then exit 1; fi;
initialDelaySeconds: 30
periodSeconds: 5
第一个子句获取容器 IP 地址:
ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print }' | cut -f1 -d'/');
第二个子句发出请求并计算响应中 'RUNNING' 的实例 json:
reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l);
第三个子句returns如果'RUNNING'出现少于两次则退出代码1
if [ $reply -lt 2 ]; then exit 1; fi
它似乎正在进行初始测试 - 即重新启动 postgres 数据库会触发 debezium 容器的重新启动。我想像这样的脚本(虽然可能 'robustified')可以包含在图像中以方便探测。