如何通过 Debezium Connect 反序列化 Kafka 消息流中的几何字段?
How can I deserialize geometry fields from Kafka messages stream via Debezium Connect?
我有一个 PostGIS + Debezium/Kafka + Debezium/Connect 设置,可以将更改从一个数据库流式传输到另一个数据库。我一直在通过 Kowl 观看消息,一切都在相应地进行。
我的问题出在我阅读来自我的 Kafka 主题的消息时,尤其是几何 (wkb) 列。
这是我的 Kafka 消息:
{
"schema":{
"type":"struct"
"fields":[...]
"optional":false
"name":"ecotx_geometry_kafka.ecotx_geometry_impo..."
}
"payload":{
"before":NULL
"after":{
"id":"d6ad5eb9-d1cb-4f91-949c-7cfb59fb07e2"
"type":"MultiPolygon"
"layer_id":"244458fa-e6e0-4c6c-a7e1-5bf0afce2fb8"
"geometry":{
"wkb":"AQYAACBqCAAAAQAAAAEDAAAAAQAAAAUAAABwQfUo..."
"srid":2154
}
"custom_style":NULL
"style_id":"default_layer_style"
}
"source":{...}
"op":"c"
"ts_ms":1618854994546
"transaction":NULL
}
}
看起来,WKB 信息类似于“AQAAAAA...”,尽管我的数据库中插入的信息是“01060000208A7A000000000000”或“LINESTRING(0 0,1 0)”。
而且我不知道如何将它 parse/transform 到我的消费者应用程序 (Kotlin/Java) 中的 ByteArray 或几何以进一步在 GeoTools 中使用。
我不知道我是否缺少能够翻译此信息的导入。
我对发布 json 消息的人只有几个问题,每条具有 geom 字段的消息(使用 Debezium 流式传输)都已更改为此“AAAQQQAAAA”。
话虽如此,我怎样才能 parse/decoded/translate 将它变成 GeoTools 可以使用的东西?
谢谢。
@更新
附加信息:
插入后,当我分析我的插槽更改时(使用 pg_logical_slot_get_changes 函数查询数据库),我能够在 WKB 中看到我的更改:
{"change":[{"kind":"insert","schema":"ecotx_geometry_import","table":"geometry_data","columnnames":["id","type","layer_id","geometry","custom_style","style_id"],"columntypes":["uuid","character varying(255)","uuid","geometry","character varying","character varying"],"columnvalues":["469f5aed-a2ea-48ca-b7d2-fe6e54b27053","MultiPolygon","244458fa-e6e0-4c6c-a7e1-5bf0afce2fb8","01060000206A08000001000000010300000001000000050000007041F528CB332C413B509BE9710A594134371E05CC332C4111F40B87720A594147E56566CD332C4198DF5D7F720A594185EF3C8ACC332C41C03BEDE1710A59417041F528CB332C413B509BE9710A5941",null,"default_layer_style"]}]}
这在消费者应用程序中会有用,它肯定依赖于 Kafka 消息内容本身,只是不确定谁在转换这个值,如果 Kafka 或 DBZ/Connect。
我认为这只是在 PostGIS 和 JSON 中表示二进制列的不同方式。 WKB 是一个二进制字段,这意味着它具有任意值的字节,其中许多没有相应的可打印字符。 PostGIS 使用 HEX 编码将其打印出来,因此它看起来像 '01060000208A7A...' - 十六进制数字,但在内部它只是字节。 Kafka 的 JSON 使用 BASE64 编码来代替完全相同的二进制消息。
让我们用您的字符串前缀进行测试,
select to_base64(from_hex('01060000206A080000010000000103000000010000000500'))
AQYAACBqCAAAAQAAAAEDAAAAAQAAAAUA
我有一个 PostGIS + Debezium/Kafka + Debezium/Connect 设置,可以将更改从一个数据库流式传输到另一个数据库。我一直在通过 Kowl 观看消息,一切都在相应地进行。
我的问题出在我阅读来自我的 Kafka 主题的消息时,尤其是几何 (wkb) 列。
这是我的 Kafka 消息:
{
"schema":{
"type":"struct"
"fields":[...]
"optional":false
"name":"ecotx_geometry_kafka.ecotx_geometry_impo..."
}
"payload":{
"before":NULL
"after":{
"id":"d6ad5eb9-d1cb-4f91-949c-7cfb59fb07e2"
"type":"MultiPolygon"
"layer_id":"244458fa-e6e0-4c6c-a7e1-5bf0afce2fb8"
"geometry":{
"wkb":"AQYAACBqCAAAAQAAAAEDAAAAAQAAAAUAAABwQfUo..."
"srid":2154
}
"custom_style":NULL
"style_id":"default_layer_style"
}
"source":{...}
"op":"c"
"ts_ms":1618854994546
"transaction":NULL
}
}
看起来,WKB 信息类似于“AQAAAAA...”,尽管我的数据库中插入的信息是“01060000208A7A000000000000”或“LINESTRING(0 0,1 0)”。
而且我不知道如何将它 parse/transform 到我的消费者应用程序 (Kotlin/Java) 中的 ByteArray 或几何以进一步在 GeoTools 中使用。
我不知道我是否缺少能够翻译此信息的导入。
我对发布 json 消息的人只有几个问题,每条具有 geom 字段的消息(使用 Debezium 流式传输)都已更改为此“AAAQQQAAAA”。
话虽如此,我怎样才能 parse/decoded/translate 将它变成 GeoTools 可以使用的东西?
谢谢。
@更新
附加信息:
插入后,当我分析我的插槽更改时(使用 pg_logical_slot_get_changes 函数查询数据库),我能够在 WKB 中看到我的更改:
{"change":[{"kind":"insert","schema":"ecotx_geometry_import","table":"geometry_data","columnnames":["id","type","layer_id","geometry","custom_style","style_id"],"columntypes":["uuid","character varying(255)","uuid","geometry","character varying","character varying"],"columnvalues":["469f5aed-a2ea-48ca-b7d2-fe6e54b27053","MultiPolygon","244458fa-e6e0-4c6c-a7e1-5bf0afce2fb8","01060000206A08000001000000010300000001000000050000007041F528CB332C413B509BE9710A594134371E05CC332C4111F40B87720A594147E56566CD332C4198DF5D7F720A594185EF3C8ACC332C41C03BEDE1710A59417041F528CB332C413B509BE9710A5941",null,"default_layer_style"]}]}
这在消费者应用程序中会有用,它肯定依赖于 Kafka 消息内容本身,只是不确定谁在转换这个值,如果 Kafka 或 DBZ/Connect。
我认为这只是在 PostGIS 和 JSON 中表示二进制列的不同方式。 WKB 是一个二进制字段,这意味着它具有任意值的字节,其中许多没有相应的可打印字符。 PostGIS 使用 HEX 编码将其打印出来,因此它看起来像 '01060000208A7A...' - 十六进制数字,但在内部它只是字节。 Kafka 的 JSON 使用 BASE64 编码来代替完全相同的二进制消息。
让我们用您的字符串前缀进行测试,
select to_base64(from_hex('01060000206A080000010000000103000000010000000500'))
AQYAACBqCAAAAQAAAAEDAAAAAQAAAAUA