Hazelcast 使用 Postgres 捕获变更数据

Hazelcast Change Data Capture with Postgres

我正在尝试将 CDC 用于我的 Postgres 数据库。

并且我使用 Hazelcast 文档示例创建了简单的项目。

https://jet-start.sh/docs/tutorials/cdc-postgres

public static void main(String[] args) {
        StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source").setDatabaseAddress("127.0.0.1")
                .setDatabasePort(5432).setDatabaseUser("postgres").setDatabasePassword("root")
                .setDatabaseName("postgres").setTableWhitelist("inventory.customers").build();

        Pipeline pipeline = Pipeline.create();
        pipeline.readFrom(source).withoutTimestamps().peek().writeTo(CdcSinks.map("customers",
                r -> r.key().toMap().get("id"), r -> r.value().toObject(Customer.class).toString()));

        JobConfig cfg = new JobConfig().setName("postgres-monitor");
        Jet.bootstrappedInstance().newJob(pipeline, cfg);
    }

当我 运行 应用程序出现以下错误时..

13:55:13.679 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.PostgresConnectorTask - user 'postgres' connected to database 'postgres' on PostgreSQL 13.2, compiled by Visual C++ build 1914, 64-bit with roles:
    role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_write_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_read_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'postgres1' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: true]
    role 'pg_execute_server_program' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.connector.postgresql.connection.PostgresConnection - No replication slot 'debezium' is present for plugin 'decoderbufs' and database 'postgres'
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.connection.PostgresConnection - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=null, catalogXmin=null]
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.PostgresConnectorTask - No previous offset found
13:55:13.691 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.snapshot.ExportedSnapshotter - Taking exported snapshot for new datasource
13:55:13.698 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Creating new replication slot 'debezium' for plugin 'DECODERBUFS'
13:55:13.737 [hz.happy_poitras.jet.blocking.thread-0] DEBUG io.debezium.jdbc.JdbcConnection - Connected to jdbc:postgresql://127.0.0.1:5432/postgres with {replication=database, server.name=1265b920-e764-4824-93f0-704e53ede9ea, history=com.hazelcast.jet.cdc.impl.CdcSourceP$DatabaseHistoryImpl, password=***, assumeMinServerVersion=9.4, user=postgres, preferQueryMode=simple}
13:55:13.738 [hz.happy_poitras.jet.blocking.thread-0] INFO io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Creating replication slot with command CREATE_REPLICATION_SLOT debezium  LOGICAL decoderbufs
13:55:13.747 [[33m WARN[0m] [[34mc.h.j.i.e.TaskletExecutionService[0m] Exception in ProcessorTasklet{postgres-monitor/source#0}
[31mcom.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
    at com.hazelcast.jet.cdc.impl.CdcSourceP.handleConnectException(CdcSourceP.java:268)
    at com.hazelcast.jet.cdc.impl.CdcSourceP.isConnectionUp(CdcSourceP.java:236)
    at com.hazelcast.jet.cdc.impl.CdcSourceP.complete(CdcSourceP.java:148)
    at com.hazelcast.jet.impl.processor.ProcessorWrapper.complete(ProcessorWrapper.java:116)
    at com.hazelcast.jet.impl.execution.ProcessorTasklet.complete(ProcessorTasklet.java:472)
    at com.hazelcast.jet.impl.execution.ProcessorTasklet.stateMachineStep(ProcessorTasklet.java:372)
    at com.hazelcast.jet.impl.execution.ProcessorTasklet.call(ProcessorTasklet.java:247)
    at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:294)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)
[0m13:55:13.776 [[31mERROR[0m] [[34mc.h.j.i.MasterJobContext[0m] Execution of job 'postgres-monitor', execution 06f2-6f35-27c1-0001 failed
    Start time: 2021-10-13T13:55:12.865
    Duration: 00:00:00.910
    To see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion
[31mcom.hazelcast.jet.JetException: Exception in ProcessorTasklet{postgres-monitor/source#0}: com.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
    at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:305)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: com.hazelcast.jet.JetException: Failed to connect to database: Creation of replication slot failed: ERROR: logical decoding requires wal_level >= logical
    at com.hazelcast.jet.cdc.impl.CdcSourceP.handleConnectException(CdcSourceP.java:268)

是否需要在数据库级别进行任何配置。(我正在使用我的本地数据库)

消息说

logical decoding requires wal_level >= logical

postgresql.conf 中,您应该设置以下内容:

wal_level = logical

对于 CDC 所需的所有 postgres 配置选项,请参阅 operations guide section on CDC