SQL 服务器的 Debezium 问题
Debezium issue with SQL Server
我使用了 debezium.io here 提供的教程并将其调整为使用 MS SQL 服务器数据库而不是 MySQL 但观察器没有显示任何当我在数据库中进行更改时发生的事件或活动。以下是我采取的步骤:
我运行动物园管理员docker命令:
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1
然后我 运行 kafka docker 命令:
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.1
然后我 运行 启用代理的 SQL 服务器 docker 命令:
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=yourStrong(!)Password' -e 'MSSQL_AGENT_ENABLED=True' --name mssql -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-latest
然后我连接到 SQL 服务器实例并创建了一个名为 PeopleDb 的数据库并创建了一个名为 [=81] 的 table =]People 通过 运行 以下查询:
USE [PeopleDb] GO
CREATE TABLE [dbo].[People]([Id] [bigint] IDENTITY(1,1) NOT NULL,[FirstName] [varchar](50) NOT NULL, [LastName] [varchar](50) NOT NULL, CONSTRAINT [PK_People] PRIMARY KEY CLUSTERED ( [Id] ASC)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]) ON [PRIMARY] GO
然后我 运行 下面的命令启用 CDC:
Use PeopleDb Go
EXEC sys.sp_cdc_enable_db
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'People', @role_name = Null, @filegroup_name = N'Primary',@supports_net_changes = 0
EXEC sys.sp_cdc_help_change_data_capture
我注意到 cdc table 是在 'System Tables' 下创建的。当我 运行 下面的查询时,一条记录被添加到 cdc.dbo_People_CT table:
INSERT INTO TABLE dbo.People(FirstName, LastName) values ('John', 'Smith')
然后我 运行 连接器 docker 命令:
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mssql:mssql debezium/connect:1.1
然后我通过将下面的 Json 发布到 http://localhost:8083/connectors/ 部署了一个连接器:
{
"name": "people-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "mssql",
"database.port": "1433",
"database.user": "sa",
"database.password": "yourStrong(!)Password",
"database.dbname": "PeopleDb",
"database.server.id": "184054",
"database.server.name": "mssql",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "mssql.dbo.people",
"name": "people-connector"
},
"tasks": [],
"type": "source"
}
我通过检查 http://localhost:8083/connectors/people-connector/status:
验证新添加的连接器是 运行
{"name":"people-connector","connector":{"state":"RUNNING","worker_id":"172.17.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.17.0.5:8083"}],"type":"source"}
我运行观察者docker命令:
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.1 watch-topic -a -k mssql.dbo.people
产生以下结果:
WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.6:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic mssql.dbo.people:
null {
"source" : {
"server" : "mssql"
},
"position" : {
"transaction_id" : null,
"event_serial_no" : 1,
"commit_lsn" : "00000025:000003f8:0003",
"change_lsn" : "NULL"
},
"databaseName" : "PeopleDb",
"schemaName" : "dbo",
"tableChanges" : [ {
"type" : "CREATE",
"id" : "\"PeopleDb\".\"dbo\".\"People\"",
"table" : {
"defaultCharsetName" : null,
"primaryKeyColumnNames" : [ "Id" ],
"columns" : [ {
"name" : "Id",
"jdbcType" : -5,
"typeName" : "bigint identity",
"typeExpression" : "bigint identity",
"charsetName" : null,
"length" : 19,
"scale" : 0,
"position" : 1,
"optional" : false,
"autoIncremented" : false,
"generated" : false
}, {
"name" : "FirstName",
"jdbcType" : 12,
"typeName" : "varchar",
"typeExpression" : "varchar",
"charsetName" : null,
"length" : 50,
"position" : 2,
"optional" : false,
"autoIncremented" : false,
"generated" : false
}, {
"name" : "LastName",
"jdbcType" : 12,
"typeName" : "varchar",
"typeExpression" : "varchar",
"charsetName" : null,
"length" : 50,
"position" : 3,
"optional" : false,
"autoIncremented" : false,
"generated" : false
} ]
}
} ]
}
完成这些步骤并确保我有一个 运行 连接器后,我希望在将新记录插入 People table 或删除或更新它们时看到新事件,但观察者没有不显示任何活动。有谁知道为什么 Debezium 和 SQL 服务器之间似乎断开连接?
根据 Debezium SQL Server Connector docs:
The SQL Server connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. The name of the Kafka topics always takes the form serverName.schemaName.tableName, where
serverName is the logical name of the connector as specified with the database.server.name
configuration property,
schemaName is the name of the schema where the operation occurred,
and tableName is the name of the database table on which the operation occurred.
在您的情况下,要查看 mssql
数据库中 dbo.People
table 的更改事件,您需要查看 mssql.dbo.People
(请注意 Kafka 中的主题名称区分大小写)。
我使用了 debezium.io here 提供的教程并将其调整为使用 MS SQL 服务器数据库而不是 MySQL 但观察器没有显示任何当我在数据库中进行更改时发生的事件或活动。以下是我采取的步骤:
我运行动物园管理员docker命令:
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1
然后我 运行 kafka docker 命令:
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.1
然后我 运行 启用代理的 SQL 服务器 docker 命令:
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=yourStrong(!)Password' -e 'MSSQL_AGENT_ENABLED=True' --name mssql -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-latest
然后我连接到 SQL 服务器实例并创建了一个名为 PeopleDb 的数据库并创建了一个名为 [=81] 的 table =]People 通过 运行 以下查询:
USE [PeopleDb] GO
CREATE TABLE [dbo].[People]([Id] [bigint] IDENTITY(1,1) NOT NULL,[FirstName] [varchar](50) NOT NULL, [LastName] [varchar](50) NOT NULL, CONSTRAINT [PK_People] PRIMARY KEY CLUSTERED ( [Id] ASC)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]) ON [PRIMARY] GO
然后我 运行 下面的命令启用 CDC:
Use PeopleDb Go
EXEC sys.sp_cdc_enable_db
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'People', @role_name = Null, @filegroup_name = N'Primary',@supports_net_changes = 0
EXEC sys.sp_cdc_help_change_data_capture
我注意到 cdc table 是在 'System Tables' 下创建的。当我 运行 下面的查询时,一条记录被添加到 cdc.dbo_People_CT table:
INSERT INTO TABLE dbo.People(FirstName, LastName) values ('John', 'Smith')
然后我 运行 连接器 docker 命令:
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mssql:mssql debezium/connect:1.1
然后我通过将下面的 Json 发布到 http://localhost:8083/connectors/ 部署了一个连接器:
{
"name": "people-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "mssql",
"database.port": "1433",
"database.user": "sa",
"database.password": "yourStrong(!)Password",
"database.dbname": "PeopleDb",
"database.server.id": "184054",
"database.server.name": "mssql",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "mssql.dbo.people",
"name": "people-connector"
},
"tasks": [],
"type": "source"
}
我通过检查 http://localhost:8083/connectors/people-connector/status:
验证新添加的连接器是 运行{"name":"people-connector","connector":{"state":"RUNNING","worker_id":"172.17.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.17.0.5:8083"}],"type":"source"}
我运行观察者docker命令:
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.1 watch-topic -a -k mssql.dbo.people
产生以下结果:
WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.6:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic mssql.dbo.people:
null {
"source" : {
"server" : "mssql"
},
"position" : {
"transaction_id" : null,
"event_serial_no" : 1,
"commit_lsn" : "00000025:000003f8:0003",
"change_lsn" : "NULL"
},
"databaseName" : "PeopleDb",
"schemaName" : "dbo",
"tableChanges" : [ {
"type" : "CREATE",
"id" : "\"PeopleDb\".\"dbo\".\"People\"",
"table" : {
"defaultCharsetName" : null,
"primaryKeyColumnNames" : [ "Id" ],
"columns" : [ {
"name" : "Id",
"jdbcType" : -5,
"typeName" : "bigint identity",
"typeExpression" : "bigint identity",
"charsetName" : null,
"length" : 19,
"scale" : 0,
"position" : 1,
"optional" : false,
"autoIncremented" : false,
"generated" : false
}, {
"name" : "FirstName",
"jdbcType" : 12,
"typeName" : "varchar",
"typeExpression" : "varchar",
"charsetName" : null,
"length" : 50,
"position" : 2,
"optional" : false,
"autoIncremented" : false,
"generated" : false
}, {
"name" : "LastName",
"jdbcType" : 12,
"typeName" : "varchar",
"typeExpression" : "varchar",
"charsetName" : null,
"length" : 50,
"position" : 3,
"optional" : false,
"autoIncremented" : false,
"generated" : false
} ]
}
} ]
}
完成这些步骤并确保我有一个 运行 连接器后,我希望在将新记录插入 People table 或删除或更新它们时看到新事件,但观察者没有不显示任何活动。有谁知道为什么 Debezium 和 SQL 服务器之间似乎断开连接?
根据 Debezium SQL Server Connector docs:
The SQL Server connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. The name of the Kafka topics always takes the form serverName.schemaName.tableName, where serverName is the logical name of the connector as specified with the
database.server.name
configuration property, schemaName is the name of the schema where the operation occurred, and tableName is the name of the database table on which the operation occurred.
在您的情况下,要查看 mssql
数据库中 dbo.People
table 的更改事件,您需要查看 mssql.dbo.People
(请注意 Kafka 中的主题名称区分大小写)。