如何在单个 Postgres 服务器中将嵌入式 Debezium 用于多个数据库?

How to use Embedded Debezium for multiple databases in a single Postgres server?

假设我们有两个微服务:service_Aservice_B
每个人在单个 Postgres 服务器实例中都有自己的数据库(分别为 db_adb_b)(这只是一个暂存环境,所以我们没有集群).

还有另一个服务,service_debezium(带有 Embedded Debezium v​​1.6.1Final)应该监听 db_adb_b。所以基本上在这个服务中配置了两个 Debezium 引擎。

但不知何故 service_debezium 无法同时监听 db_adb_b。由于某种原因,它只监听其中之一,并且没有错误日志。

此外,如果我配置 service_debezium(即它的 Debezium 引擎)来监听 db_adb_b,它会按预期工作,所以我确定它们的配置属性是正确的,并且(当只有一个引擎时)一切正常。

  1. 那么为什么我们不能使用多个 Debezium 引擎来监听单个 Postgres 服务器中的多个数据库呢?我在这里错过了什么?
  2. 我认为的另一种选择是只使用一个 Debezium 引擎来侦听该 Postgres 服务器实例中的所有数据库,但显然它的配置需要 database.dbname 所以我想首选的方法是定义一个新的 Debezium每个数据库的引擎。对吗?

以下是 service_debezium 中的 Debezium 配置:

  @Bean
  public io.debezium.config.Configuration dbAConnector() throws IOException {
    File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        .with("name", "db_a_connector")
        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", "localhost")
        .with("database.port", 5432)
        .with("database.user", "postgres")
        .with("database.password", "*****")
        .with("database.dbname", "db_a")
        .with("table.whitelist", "public.dummy_table,public.another_dummy_table")
        .with("plugin.name", "pgoutput")
        .with("slot.name", "db_a_connector")
        .with("database.server.name", "db_a_server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  }
  @Bean
  public io.debezium.config.Configuration dbBConnector() throws IOException {
    File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        .with("name", "db_b_connector")
        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", localhost)
        .with("database.port", 5432)
        .with("database.user", "postgres")
        .with("database.password", "*****")
        .with("database.dbname", "db_b")
        .with("table.whitelist", "public.yet_another_table")
        .with("plugin.name", "pgoutput")
        .with("slot.name", "db_b_connector")
        .with("database.server.name", "db_b_server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  }

以下是我在 service_debezium 中创建 Debezium 引擎实例的方法:

@Component
public class DBAListener {

  public DBAListener(
      @Qualifier("dbAConnector") Configuration connectorConfiguration /*, ... other services */) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(connectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();

    // ...
  }

  // ...

}
@Component
public class DBBListener {

  public DBBListener(
      @Qualifier("dbBConnector") Configuration connectorConfiguration /*, ... other services */) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(connectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();

    // ...
  }

  // ...

}
# ...other conf

wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
# max number of walsender processes (i.e. # of replica databases + # of Debezium engines) (change requires restart)
# Abrupt streaming client disconnection might cause an orphaned connection slot until a timeout is reached, 
# so this parameter should be set slightly higher than the maximum number of expected clients 
# so disconnected clients can immediately reconnect
max_wal_senders = 3
max_replication_slots = 3       # max number of replication slots (change requires restart)

提前致谢!

UPDATE:根据 .

更新了代码片段以提供完整示例

当您创建 debezium 连接器时,它会创建一个默认名称为“debezium”的复制槽。然后你尝试创建另一个实例并尝试创建一个具有相同名称的复制槽并且不能使用相同的复制槽同时使用两个实例,这将抛出错误。这是一个糟糕的解释,但我会给出解决方案。

在每个连接器上添加此配置:

在 dbAConnector 上

.with("slot.name", "dbAConnector")

和 dbBConnector

.with("slot.name", "dbBConnector")

您可以列出可用的复制槽:

SELECT * FROM pg_replication_slots;

并且您可以删除未使用的复制槽,例如默认名称“debezium”:

SELECT pg_drop_replication_slot('debezium');

因为会在没有人使用这个插槽时使用磁盘。