如何使用 debezium 从 Postgres 流式传输更改
How to stream changes from Postgres using debezium
使用 debezium 从 Postgres 流式传输更改
设置已完成:
- Docker设置。
- 启动了 Postgres、zookeeper、kafka,然后启动了 debezium 连接器。
- 使用 decoderbufs、wal2json(postgres) 设置远程数据库。
- 正在使用 curl 连接到 debezium。
- 创建了观察者。
问题:当我启动 watcher 时,它正在读取之前发生的所有更改,但是当任何插入完成时,kafka 都会向 debezium 抛出异常,说“An exception occurred in the change event producer. This connector will be stopped.
”并且在 watcher none 中显示.
由于我对这些概念很陌生,无法弄清楚我在环境设置中遗漏了什么,这是我在堆栈溢出中的第一个问题,请忽略我的错误。
主要问题是我的本地数据库运行良好。
有人可以帮忙吗?
提前致谢
019-05-02 14:09:47,242 WARN Postgres|kafkaserver|records-stream-producer Closing replication stream due to db connection IO exception... [io.debezium.connector.postgresql.RecordsStreamProducer]
2019-05-02 14:09:47,365 INFO || WorkerSourceTask{id=kafka-public-connector-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,366 INFO || WorkerSourceTask{id=kafka-public-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,375 ERROR || WorkerSourceTask{id=kafka-public-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1037)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.read(PostgresReplicationConnection.java:251)
at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start[=10=](RecordsStreamProducer.java:120)
... 5 more
Caused by: java.io.EOFException
at org.postgresql.core.PGStream.receiveChar(PGStream.java:308)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1079)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1035)
... 12 more
2019-05-02 14:09:47,387 ERROR || WorkerSourceTask{id=kafka-public-connector-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
- 是否有解决此问题的完整指南?
- 主要目标是,我有一个包含大量数据的数据库,其中一个
应用程序(生产者)从另一台服务器获取数据并保存
我们自己的数据库和另一个应用程序(消费者)中的所有数据都是为了获取
存储并应用业务逻辑和前端。我在这里
想要将来自另一个应用程序(消费者)的命中替换为数据库
这个 debezium 和 kafka 部分。
- 或者有什么方法可以做到这一点。
感谢大家。以上问题解决。远程数据库中设置的实际问题。很少有其他依赖项没有正确安装,如 postgis、protobuf-c、decoderbufs 一旦正确安装问题就解决了。
对我来说,我检查了 rds 数据库实例的 replication_slot 大小,它是不活动的。我使用下面的查询来获取 replication_slot
的状态
select slot_name, pg_size_pretty(pg_xlog_location_diff(pg_current_xlog_location(),restart_lsn)) as replicationSlotLag, active from pg_replication_slots ;
如果 actice 列的值为 'f',则表示它处于非活动状态并且连接器无法连接到数据库。由于复制槽已经处于非活动状态,我使用以下语句删除了它
select pg_drop_replication_slot('your_slot_name');
之后重新启动连接器解决了这个错误。
使用 debezium 从 Postgres 流式传输更改
设置已完成:
- Docker设置。
- 启动了 Postgres、zookeeper、kafka,然后启动了 debezium 连接器。
- 使用 decoderbufs、wal2json(postgres) 设置远程数据库。
- 正在使用 curl 连接到 debezium。
- 创建了观察者。
问题:当我启动 watcher 时,它正在读取之前发生的所有更改,但是当任何插入完成时,kafka 都会向 debezium 抛出异常,说“An exception occurred in the change event producer. This connector will be stopped.
”并且在 watcher none 中显示.
由于我对这些概念很陌生,无法弄清楚我在环境设置中遗漏了什么,这是我在堆栈溢出中的第一个问题,请忽略我的错误。
主要问题是我的本地数据库运行良好。
有人可以帮忙吗?
提前致谢
019-05-02 14:09:47,242 WARN Postgres|kafkaserver|records-stream-producer Closing replication stream due to db connection IO exception... [io.debezium.connector.postgresql.RecordsStreamProducer]
2019-05-02 14:09:47,365 INFO || WorkerSourceTask{id=kafka-public-connector-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,366 INFO || WorkerSourceTask{id=kafka-public-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,375 ERROR || WorkerSourceTask{id=kafka-public-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1037)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.read(PostgresReplicationConnection.java:251)
at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start[=10=](RecordsStreamProducer.java:120)
... 5 more
Caused by: java.io.EOFException
at org.postgresql.core.PGStream.receiveChar(PGStream.java:308)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1079)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1035)
... 12 more
2019-05-02 14:09:47,387 ERROR || WorkerSourceTask{id=kafka-public-connector-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
- 是否有解决此问题的完整指南?
- 主要目标是,我有一个包含大量数据的数据库,其中一个 应用程序(生产者)从另一台服务器获取数据并保存 我们自己的数据库和另一个应用程序(消费者)中的所有数据都是为了获取 存储并应用业务逻辑和前端。我在这里 想要将来自另一个应用程序(消费者)的命中替换为数据库 这个 debezium 和 kafka 部分。
- 或者有什么方法可以做到这一点。
感谢大家。以上问题解决。远程数据库中设置的实际问题。很少有其他依赖项没有正确安装,如 postgis、protobuf-c、decoderbufs 一旦正确安装问题就解决了。
对我来说,我检查了 rds 数据库实例的 replication_slot 大小,它是不活动的。我使用下面的查询来获取 replication_slot
的状态select slot_name, pg_size_pretty(pg_xlog_location_diff(pg_current_xlog_location(),restart_lsn)) as replicationSlotLag, active from pg_replication_slots ;
如果 actice 列的值为 'f',则表示它处于非活动状态并且连接器无法连接到数据库。由于复制槽已经处于非活动状态,我使用以下语句删除了它
select pg_drop_replication_slot('your_slot_name');
之后重新启动连接器解决了这个错误。