Hazelcast 使用 Postgres 捕获变更数据
Hazelcast Change Data Capture with Postgres
我正在尝试将 CDC 用于我的 Postgres 数据库。
并且我使用 Hazelcast 文档示例创建了简单的项目。
https://jet-start.sh/docs/tutorials/cdc-postgres
public static void main(String[] args) {
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source").setDatabaseAddress("127.0.0.1")
.setDatabasePort(5432).setDatabaseUser("postgres").setDatabasePassword("root")
.setDatabaseName("postgres").setTableWhitelist("inventory.customers").build();
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(source).withoutTimestamps().peek().writeTo(CdcSinks.map("customers",
r -> r.key().toMap().get("id"), r -> r.value().toObject(Customer.class).toString()));
JobConfig cfg = new JobConfig().setName("postgres-monitor");
Jet.bootstrappedInstance().newJob(pipeline, cfg);
}
当我 运行 应用程序出现以下错误时..
13:55:13.679 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.PostgresConnectorTask - user 'postgres' connected to database 'postgres' on PostgreSQL 13.2, compiled by Visual C++ build 1914, 64-bit with roles:
role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_write_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_read_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'postgres1' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: true]
role 'pg_execute_server_program' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.connector.postgresql.connection.PostgresConnection - No replication slot 'debezium' is present for plugin 'decoderbufs' and database 'postgres'
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.connection.PostgresConnection - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=null, catalogXmin=null]
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.PostgresConnectorTask - No previous offset found
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.snapshot.ExportedSnapshotter - Taking exported snapshot for new datasource
13:55:13.698 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Creating new replication slot 'debezium' for plugin 'DECODERBUFS'
13:55:13.737 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.jdbc.JdbcConnection - Connected to jdbc:postgresql://127.0.0.1:5432/postgres with {replication=database, server.name=1265b920-e764-4824-93f0-704e53ede9ea, history=com.hazelcast.jet.cdc.impl.CdcSourceP$DatabaseHistoryImpl, password=***, assumeMinServerVersion=9.4, user=postgres, preferQueryMode=simple}
13:55:13.738 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Creating replication slot with command CREATE_REPLICATION_SLOT debezium LOGICAL decoderbufs
13:55:13.747 [[33m WARN[0m] [[34mc.h.j.i.e.TaskletExecutionService[0m] Exception in ProcessorTasklet{postgres-monitor/source#0}
[31mcom.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
at com.hazelcast.jet.cdc.impl.CdcSourceP.handleConnectException(CdcSourceP.java:268)
at com.hazelcast.jet.cdc.impl.CdcSourceP.isConnectionUp(CdcSourceP.java:236)
at com.hazelcast.jet.cdc.impl.CdcSourceP.complete(CdcSourceP.java:148)
at com.hazelcast.jet.impl.processor.ProcessorWrapper.complete(ProcessorWrapper.java:116)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.complete(ProcessorTasklet.java:472)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.stateMachineStep(ProcessorTasklet.java:372)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.call(ProcessorTasklet.java:247)
at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:294)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[0m13:55:13.776 [[31mERROR[0m] [[34mc.h.j.i.MasterJobContext[0m] Execution of job 'postgres-monitor', execution 06f2-6f35-27c1-0001 failed
Start time: 2021-10-13T13:55:12.865
Duration: 00:00:00.910
To see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion
[31mcom.hazelcast.jet.JetException: Exception in ProcessorTasklet{postgres-monitor/source#0}: com.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:305)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: com.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
at com.hazelcast.jet.cdc.impl.CdcSourceP.handleConnectException(CdcSourceP.java:268)
是否需要在数据库级别进行任何配置。(我正在使用我的本地数据库)
消息说
logical decoding requires wal_level >= logical
在 postgresql.conf
中,您应该设置以下内容:
wal_level = logical
对于 CDC 所需的所有 postgres 配置选项,请参阅 operations guide section on CDC。
我正在尝试将 CDC 用于我的 Postgres 数据库。
并且我使用 Hazelcast 文档示例创建了简单的项目。
https://jet-start.sh/docs/tutorials/cdc-postgres
public static void main(String[] args) {
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source").setDatabaseAddress("127.0.0.1")
.setDatabasePort(5432).setDatabaseUser("postgres").setDatabasePassword("root")
.setDatabaseName("postgres").setTableWhitelist("inventory.customers").build();
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(source).withoutTimestamps().peek().writeTo(CdcSinks.map("customers",
r -> r.key().toMap().get("id"), r -> r.value().toObject(Customer.class).toString()));
JobConfig cfg = new JobConfig().setName("postgres-monitor");
Jet.bootstrappedInstance().newJob(pipeline, cfg);
}
当我 运行 应用程序出现以下错误时..
13:55:13.679 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.PostgresConnectorTask - user 'postgres' connected to database 'postgres' on PostgreSQL 13.2, compiled by Visual C++ build 1914, 64-bit with roles:
role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_write_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_read_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'postgres1' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: true]
role 'pg_execute_server_program' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.connector.postgresql.connection.PostgresConnection - No replication slot 'debezium' is present for plugin 'decoderbufs' and database 'postgres'
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.connection.PostgresConnection - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=null, catalogXmin=null]
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.PostgresConnectorTask - No previous offset found
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.snapshot.ExportedSnapshotter - Taking exported snapshot for new datasource
13:55:13.698 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Creating new replication slot 'debezium' for plugin 'DECODERBUFS'
13:55:13.737 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.jdbc.JdbcConnection - Connected to jdbc:postgresql://127.0.0.1:5432/postgres with {replication=database, server.name=1265b920-e764-4824-93f0-704e53ede9ea, history=com.hazelcast.jet.cdc.impl.CdcSourceP$DatabaseHistoryImpl, password=***, assumeMinServerVersion=9.4, user=postgres, preferQueryMode=simple}
13:55:13.738 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Creating replication slot with command CREATE_REPLICATION_SLOT debezium LOGICAL decoderbufs
13:55:13.747 [[33m WARN[0m] [[34mc.h.j.i.e.TaskletExecutionService[0m] Exception in ProcessorTasklet{postgres-monitor/source#0}
[31mcom.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
at com.hazelcast.jet.cdc.impl.CdcSourceP.handleConnectException(CdcSourceP.java:268)
at com.hazelcast.jet.cdc.impl.CdcSourceP.isConnectionUp(CdcSourceP.java:236)
at com.hazelcast.jet.cdc.impl.CdcSourceP.complete(CdcSourceP.java:148)
at com.hazelcast.jet.impl.processor.ProcessorWrapper.complete(ProcessorWrapper.java:116)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.complete(ProcessorTasklet.java:472)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.stateMachineStep(ProcessorTasklet.java:372)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.call(ProcessorTasklet.java:247)
at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:294)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[0m13:55:13.776 [[31mERROR[0m] [[34mc.h.j.i.MasterJobContext[0m] Execution of job 'postgres-monitor', execution 06f2-6f35-27c1-0001 failed
Start time: 2021-10-13T13:55:12.865
Duration: 00:00:00.910
To see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion
[31mcom.hazelcast.jet.JetException: Exception in ProcessorTasklet{postgres-monitor/source#0}: com.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:305)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: com.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
at com.hazelcast.jet.cdc.impl.CdcSourceP.handleConnectException(CdcSourceP.java:268)
是否需要在数据库级别进行任何配置。(我正在使用我的本地数据库)
消息说
logical decoding requires wal_level >= logical
在 postgresql.conf
中,您应该设置以下内容:
wal_level = logical
对于 CDC 所需的所有 postgres 配置选项,请参阅 operations guide section on CDC。