如何从具有十进制类型列的主题在 ksql 中创建流
how to create stream in ksql from topic with decimal type column
我想从监控 mysql table 的 kafka 主题创建一个流。 mysql table 具有 decimal(16,4) 类型的列,当我使用此命令创建流时:
create stream test with (KAFKA_TOPIC='dbServer.Kafka.DailyUdr',VALUE_FORMAT='AVRO');
流已创建并且 运行 但具有 decimal(16,4) 类型的列不会出现在结果流中。
source topic value schema:
{
"type": "record",
"name": "Envelope",
"namespace": "dbServer.Kafka.DailyUdr",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "UserId",
"type": "int"
},
{
"name": "NationalCode",
"type": "string"
},
{
"name": "TotalInputOcted",
"type": "int"
},
{
"name": "TotalOutputOcted",
"type": "int"
},
{
"name": "Date",
"type": "string"
},
{
"name": "Service",
"type": "string"
},
{
"name": "decimalCol",
"type": [
"null",
{
"type": "bytes",
"scale": 4,
"precision": 16,
"connect.version": 1,
"connect.parameters": {
"scale": "4",
"connect.decimal.precision": "16"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
],
"connect.name": "dbServer.Kafka.DailyUdr.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.mysql",
"fields": [
{
"name": "version",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "connector",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "name",
"type": "string"
},
{
"name": "server_id",
"type": "long"
},
{
"name": "ts_sec",
"type": "long"
},
{
"name": "gtid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "file",
"type": "string"
},
{
"name": "pos",
"type": "long"
},
{
"name": "row",
"type": "int"
},
{
"name": "snapshot",
"type": [
{
"type": "boolean",
"connect.default": false
},
"null"
],
"default": false
},
{
"name": "thread",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "db",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "table",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "query",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.debezium.connector.mysql.Source"
}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
}
],
"connect.name": "dbServer.Kafka.DailyUdr.Envelope"
}
我的问题出在 decimalCol 列
KSQL 尚不支持 DECIMAL 数据类型。
有 an issue here 如果您认为有用,您可以跟踪和投票。
我想从监控 mysql table 的 kafka 主题创建一个流。 mysql table 具有 decimal(16,4) 类型的列,当我使用此命令创建流时:
create stream test with (KAFKA_TOPIC='dbServer.Kafka.DailyUdr',VALUE_FORMAT='AVRO');
流已创建并且 运行 但具有 decimal(16,4) 类型的列不会出现在结果流中。
source topic value schema: { "type": "record", "name": "Envelope", "namespace": "dbServer.Kafka.DailyUdr", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "UserId", "type": "int" }, { "name": "NationalCode", "type": "string" }, { "name": "TotalInputOcted", "type": "int" }, { "name": "TotalOutputOcted", "type": "int" }, { "name": "Date", "type": "string" }, { "name": "Service", "type": "string" }, { "name": "decimalCol", "type": [ "null", { "type": "bytes", "scale": 4, "precision": 16, "connect.version": 1, "connect.parameters": { "scale": "4", "connect.decimal.precision": "16" }, "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" } ], "default": null } ], "connect.name": "dbServer.Kafka.DailyUdr.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.mysql", "fields": [ { "name": "version", "type": [ "null", "string" ], "default": null }, { "name": "connector", "type": [ "null", "string" ], "default": null }, { "name": "name", "type": "string" }, { "name": "server_id", "type": "long" }, { "name": "ts_sec", "type": "long" }, { "name": "gtid", "type": [ "null", "string" ], "default": null }, { "name": "file", "type": "string" }, { "name": "pos", "type": "long" }, { "name": "row", "type": "int" }, { "name": "snapshot", "type": [ { "type": "boolean", "connect.default": false }, "null" ], "default": false }, { "name": "thread", "type": [ "null", "long" ], "default": null }, { "name": "db", "type": [ "null", "string" ], "default": null }, { "name": "table", "type": [ "null", "string" ], "default": null }, { "name": "query", "type": [ "null", "string" ], "default": null } ], "connect.name": "io.debezium.connector.mysql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null } ], "connect.name": "dbServer.Kafka.DailyUdr.Envelope" }
我的问题出在 decimalCol 列
KSQL 尚不支持 DECIMAL 数据类型。
有 an issue here 如果您认为有用,您可以跟踪和投票。