如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值
How to get org.apache.kafka.connect.data.Decimal value from Kafka JSON message
我使用 debizium 将 postgresql 数据流式传输到 Kafka,并使用 Java 订阅 Kafka 主题。
我收到Kafka消息,得到一个JSON字符串,但是有一个数值无法识别。
JSON是:
{
"schema":
{
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "creator"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "createtime"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters":
{
"scale": "5",
"connect.decimal.precision": "32"
},
"field": "familyprice"
}],
"optional": true,
"name": "pssdev.public.order.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "creator"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "createtime"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters":
{
"scale": "5",
"connect.decimal.precision": "32"
},
"field": "familyprice"
}],
"optional": true,
"name": "pssdev.public.order.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": true,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "int64",
"optional": true,
"field": "ts_usec"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "string",
"optional": true,
"field": "schema"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "boolean",
"optional": true,
"field": "last_snapshot_record"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}],
"optional": false,
"name": "pssdev.public.order.Envelope"
},
"payload":
{
"before":
{
"creator": null,
"createtime": null,
"familyprice": null
},
"after":
{
"creator": "USER1E",
"createtime": 1554292597815000,
"familyprice": "W42A"
},
"source":
{
"version": "0.9.5.Final",
"connector": "postgresql",
"name": "pssdev",
"db": "pf",
"ts_usec": 1561459811737920,
"txId": 771604,
"lsn": 88282458880,
"schema": "public",
"table": "order",
"snapshot": false,
"last_snapshot_record": null,
"xmin": null
},
"op": "u",
"ts_ms": 1561459811747
}
}
familyprice 值是 W42A
我不知道如何转换它。
数据库中的实际familyprice值为60.00000,列类型为numeric(32,5)
Java 位客户请参阅 https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation。
您还可以将 decimal.handling.mode
设置为不同的值,这样您将以字符串形式或双精度形式接收 Decimal(如果对您来说更容易的话)。
我使用 debizium 将 postgresql 数据流式传输到 Kafka,并使用 Java 订阅 Kafka 主题。
我收到Kafka消息,得到一个JSON字符串,但是有一个数值无法识别。
JSON是:
{
"schema":
{
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "creator"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "createtime"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters":
{
"scale": "5",
"connect.decimal.precision": "32"
},
"field": "familyprice"
}],
"optional": true,
"name": "pssdev.public.order.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "creator"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "createtime"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters":
{
"scale": "5",
"connect.decimal.precision": "32"
},
"field": "familyprice"
}],
"optional": true,
"name": "pssdev.public.order.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": true,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "int64",
"optional": true,
"field": "ts_usec"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "string",
"optional": true,
"field": "schema"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "boolean",
"optional": true,
"field": "last_snapshot_record"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}],
"optional": false,
"name": "pssdev.public.order.Envelope"
},
"payload":
{
"before":
{
"creator": null,
"createtime": null,
"familyprice": null
},
"after":
{
"creator": "USER1E",
"createtime": 1554292597815000,
"familyprice": "W42A"
},
"source":
{
"version": "0.9.5.Final",
"connector": "postgresql",
"name": "pssdev",
"db": "pf",
"ts_usec": 1561459811737920,
"txId": 771604,
"lsn": 88282458880,
"schema": "public",
"table": "order",
"snapshot": false,
"last_snapshot_record": null,
"xmin": null
},
"op": "u",
"ts_ms": 1561459811747
}
}
familyprice 值是 W42A
我不知道如何转换它。
数据库中的实际familyprice值为60.00000,列类型为numeric(32,5)
Java 位客户请参阅 https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation。
您还可以将 decimal.handling.mode
设置为不同的值,这样您将以字符串形式或双精度形式接收 Decimal(如果对您来说更容易的话)。