debezium sqlserver 连接器输出 numeric/decimal 字段的编码值
debezium sqlserver connector outputs encoded values for numeric/decimal fields
我在 SQL 服务器中有这个 table:
CREATE TABLE [dbo].[blast_info](
[blast_id] [int] NOT NULL,
[tnt_amount_kg] [decimal](18, 2) NOT NULL,
[time_blasted] [datetime] NOT NULL,
[hole_deep_ft] [numeric](9, 2) NULL,
[hole_coord_n] [numeric](18, 6) NOT NULL,
[hole_coord_e] [numeric](18, 6) NULL
) ON [PRIMARY]
debezium 配置为 运行 作为 confluent 的插件。数据已发布到 Kafka 中,但是当我通过 python 或控制台消费者读取它时,我看到了数字和小数类型的编码值:
{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":16,"tnt_amount_kg":"ANw=","time_blasted":1583125200000,"hole_deep_ft":"Aa4=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437126}
Processed a total of 38 messages
为什么会这样,解决方法是什么?谢谢
可能的快速修复(完全不推荐):
只需在 SQL 服务器中使用 real
数据类型,而不是 numeric
或 decimal
,因为 Debezium 会将 real
存储为 float
。
长期修复:
如 Debezium SQL server Connector Documentation 中所述,它将 decimal
和 numeric
值存储为 binary
,由 class org.apache.kafka.connect.data.Decimal
表示。
您可以从消息本身检索此信息,但为此您需要在消息中启用架构。您可以通过设置 key.converter.schemas.enable=true
(对于消息键)和 value.converter.schemas.enable=true
(对于消息值)来实现。
更改以上属性后,您的邮件将包含架构信息。
参考这个例子:
Table 架构:
CREATE TABLE [dbo].[kafka_datatype](
[id] [int] IDENTITY(1,1) PRIMARY KEY,
[col_value] [varchar](10) NULL,
[create_date] [datetime] NULL,
[col_decimal] [decimal](33, 18) NULL,
[col_double] [real] NULL,
[comments] [varchar](5000) NULL
)
卡夫卡消息:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "col_value"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "create_date"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "18",
"connect.decimal.precision": "33"
},
"field": "col_decimal"
},
{
"type": "float",
"optional": true,
"field": "col_double"
},
{
"type": "string",
"optional": true,
"field": "comments"
}
],
"optional": true,
"name": "TEST.dbo.kafka_datatype.Value"
},
"payload": {
"id": 15,
"col_value": "test",
"create_date": 1586335960297,
"col_decimal": "AKg/JYrONaAA",
"col_double": 12.12345,
"comments": null
}
}
请阅读 Debezium SQL server Connector Documentation 以了解 Debezium 如何处理数据类型。
现在来到消费者部分,根据您的需要使用接收器连接器(例如JDBC Sink Connector)。如果您想使用 python 或控制台消费者,您需要编写自己的解串器。
P.S.:
随着时间的推移可能会出现一个问题,主题大小会随着每条消息存储模式而增加。为避免将架构持久化到消息中,您可以使用 Avro Converter
Schema Registry 由 Confluent 提供。
希望对您有所帮助!
我在 SQL 服务器中有这个 table:
CREATE TABLE [dbo].[blast_info](
[blast_id] [int] NOT NULL,
[tnt_amount_kg] [decimal](18, 2) NOT NULL,
[time_blasted] [datetime] NOT NULL,
[hole_deep_ft] [numeric](9, 2) NULL,
[hole_coord_n] [numeric](18, 6) NOT NULL,
[hole_coord_e] [numeric](18, 6) NULL
) ON [PRIMARY]
debezium 配置为 运行 作为 confluent 的插件。数据已发布到 Kafka 中,但是当我通过 python 或控制台消费者读取它时,我看到了数字和小数类型的编码值:
{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":16,"tnt_amount_kg":"ANw=","time_blasted":1583125200000,"hole_deep_ft":"Aa4=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437126}
Processed a total of 38 messages
为什么会这样,解决方法是什么?谢谢
可能的快速修复(完全不推荐):
只需在 SQL 服务器中使用 real
数据类型,而不是 numeric
或 decimal
,因为 Debezium 会将 real
存储为 float
。
长期修复:
如 Debezium SQL server Connector Documentation 中所述,它将 decimal
和 numeric
值存储为 binary
,由 class org.apache.kafka.connect.data.Decimal
表示。
您可以从消息本身检索此信息,但为此您需要在消息中启用架构。您可以通过设置 key.converter.schemas.enable=true
(对于消息键)和 value.converter.schemas.enable=true
(对于消息值)来实现。
更改以上属性后,您的邮件将包含架构信息。 参考这个例子:
Table 架构:
CREATE TABLE [dbo].[kafka_datatype](
[id] [int] IDENTITY(1,1) PRIMARY KEY,
[col_value] [varchar](10) NULL,
[create_date] [datetime] NULL,
[col_decimal] [decimal](33, 18) NULL,
[col_double] [real] NULL,
[comments] [varchar](5000) NULL
)
卡夫卡消息:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "col_value"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "create_date"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "18",
"connect.decimal.precision": "33"
},
"field": "col_decimal"
},
{
"type": "float",
"optional": true,
"field": "col_double"
},
{
"type": "string",
"optional": true,
"field": "comments"
}
],
"optional": true,
"name": "TEST.dbo.kafka_datatype.Value"
},
"payload": {
"id": 15,
"col_value": "test",
"create_date": 1586335960297,
"col_decimal": "AKg/JYrONaAA",
"col_double": 12.12345,
"comments": null
}
}
请阅读 Debezium SQL server Connector Documentation 以了解 Debezium 如何处理数据类型。
现在来到消费者部分,根据您的需要使用接收器连接器(例如JDBC Sink Connector)。如果您想使用 python 或控制台消费者,您需要编写自己的解串器。
P.S.:
随着时间的推移可能会出现一个问题,主题大小会随着每条消息存储模式而增加。为避免将架构持久化到消息中,您可以使用 Avro Converter Schema Registry 由 Confluent 提供。
希望对您有所帮助!