CDC 未公开的其他唯一索引引用列导致异常

Additional unique index referencing columns not exposed by CDC causes exception

我正在使用 SQL 连接器在 table 上捕获 CDC,我们只公开 table 上所有列的一个子集。 table 有两个唯一索引 A 和 B。这两个索引都没有标记为 PRIMARY INDEX,但索引 A 在逻辑上是我们产品中的主键,也是我想与连接器一起使用的内容。索引 B 引用了我们不向 CDC 公开的列。索引 B 在我们的产品中并没有真正用作 table 的唯一键,它只是被标记为 UNIQUE,因为它被认为是唯一的,标记它给我们带来了性能优势。

这似乎是导致以下错误的原因。我尝试使用连接器上的 message.key.columns 选项将索引 A 指定为此 table 的键,并希望忽略索引 B。但是,连接器似乎仍想对索引 B

  1. 我该如何解决这种情况?
  2. 根据我自己的理解,为什么连接器会关心引用 CDC 未公开的列的索引?
  3. 根据我自己的理解,为什么连接器关心除了 CDC table 上配置的索引之外的任何索引,即参见 CDC.change_tables.index_name 文档
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:290)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start[=10=](ChangeEventSourceCoordinator.java:119)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: The column "mynoncdccolumn" is referenced as PRIMARY KEY, but a matching column is not defined in table "mydatabase.myschema.mytable"!
    at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys[=10=](TableEditorImpl.java:105)
    at java.base/java.util.ArrayList.removeIf(ArrayList.java:1702)
    at java.base/java.util.ArrayList.removeIf(ArrayList.java:1690)
    at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:101)
    at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:254)
    at io.debezium.connector.sqlserver.SqlServerConnection.getTableSchemaFromTable(SqlServerConnection.java:428)
    at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getCdcTablesToQuery(SqlServerStreamingChangeEventSource.java:378)
    at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:121)

我的连接器配置

{
  "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
  "database.user": "myuser",
  "database.password": "mspassword",
  "database.dbname": "mydb",
  "database.hostname": "mysqlserverinstance",
  "database.history.kafka.bootstrap.servers": "b-1.mycluster:9092,b-2.mycluster:9092,b-3.mycluster:9092",
  "database.history.kafka.topic": "myhistorytopic",
  "database.server.name": "myserver",
  "message.key.columns": "myschema.mytable:KeyColumn1,KeyColumn2;"
}

table定义

CREATE TABLE [myschema].[mytable]
(
[MyKeyColumn1] [int] NOT NULL,
[MyKeyColumn2] [int] NOT NULL,
[Data] [varchar] (255) NOT NULL,
[UniqueColumn1] [timestamp] NOT NULL
)
GO

CREATE UNIQUE NONCLUSTERED INDEX [IndexB] ON [myschema].[mytable] ([UniqueColumn1])
GO
CREATE UNIQUE NONCLUSTERED INDEX [IndexA] ON [myschema].[mytable] ([MyKeyColumn1], [MyKeyColumn2])
GO

Debezium 的一位贡献者似乎确认这是一个产品错误 https://gitter.im/debezium/user?at=60b8e96778e1d6477d7f40b5. I have created an issue https://issues.redhat.com/browse/DBZ-3597

编辑:

已发布并批准 PR 以解决此问题。修复在当前的 1.6 beta 快照版本中。

有一个可能的解决方法。索引的名称是问题的关键。似乎它们是按字母顺序处理的。只有第一个被考虑在内,所以如果您可以重命名您的索引以首先使用带有键的索引,那么您应该可以畅通无阻。