KsqlDB 从 TextNode 到 Array 的转换
KsqlDB Converting from TextNode to Array
TL;DR:在 ksqlDB 中,有没有办法将 TextNode
转换为 Array<VARCHAR>
,以便可以无误地执行 EXPLODE
?
全新的 ksqlDB 和 运行 遇到一个奇怪的问题。我正在使用 debezium -> ksqldb 进行 ETLing,数据正在流动,这很棒。问题是,当我使用 EXPLODE 函数时,它无法解析,因为我想成为一个 ARRAY 实际上是一个 TEXTNODE。这是一个来自 Postgres 的简化数据结构,其中 data
是 postgres 中的一个 JSONB:
{
"id": "b5b55e07-15d7-4559-8319-18a67205ea4d",
"data": [
"d728fef0-9eec-4dec-b9b6-04b5444431f6",
"7a475d25-ec73-41c3-9fbc-0a62e96d887a"
]
}
我的 debezium 连接器正在使用 KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
设置和输出(主题输出有我忽略的额外字段):
SET 'auto.offset.reset' = 'earliest';
CREATE SOURCE CONNECTOR forms_reader WITH (
'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
'database.hostname' = 'db',
'database.port' = '5432',
'database.user' = 'master',
'database.password' = 'secret',
'database.dbname' = 'forms',
'database.server.name' = 'forms',
'table.whitelist' = 'public.response_version',
'transforms' = 'unwrap',
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones' = 'false',
'transforms.unwrap.delete.handling.mode' = 'rewrite'
);
print 'forms.public.response_version' from beginning;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/12/17 21:42:29.879 Z, key: [Struct{id=b5b55e07-15d7-4559-8319-18a67@3616449210317300861/-], value: {"id":"b5b55e07-15d7-4559-8319-18a67205ea4d","response_id":"403fc75f-97fa-4f06-9f66-bebff6b458c7","data":"[\"d728fef0-9eec-4dec-b9b6-04b5444431f6\", \"7a475d25-ec73-41c3-9fbc-0a62e96d887a\"]","created_by":"b5166a61-71bb-4e50-b445-afcc64d46b5e","created_at":1608217959510550,"previous_response_version_id":null,"form_version_id":"6b4f9c86-4984-4d05-9e7e-8e51b97189b3","__deleted":"false"}
创建流:
CREATE STREAM response_versions(
id VARCHAR KEY,
data ARRAY<String>)
WITH(kafka_topic='forms.public.response_version', value_format='JSON');
CREATE STREAM response_fields
WITH(value_format='JSON')
AS SELECT id AS rv_id,
EXPLODE(data) AS data_field
FROM response_versions EMIT CHANGES;
创建第二个流的日志中出错:
org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: forms.public.response_version. Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
有人对此有解决方案吗?理想情况下,我想将 TextNode 放入数组中,但我不确定如何。
谢谢,
帕特里克
错误是由于 response_versions
流无法将字符串读入数组造成的。要进行转换,您必须创建该流以读取字符串,然后将字符串转换为数组。
为了转换 str -> arry,我不得不用 regexp_replace
删除括号,然后用 regexp_split_to_array
.
将字符串拆分为一个数组
CREATE STREAM response_versions(
id VARCHAR KEY,
data STRING)
WITH(kafka_topic='forms.public.response_version', value_format='JSON');
CREATE STREAM response_fields
WITH(value_format='JSON')
AS SELECT
id AS rv_id,
EXPLODE(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(data, '\[|\]', ''), '\s*,\s*')) as AS data_field
FROM response_versions EMIT CHANGES;
以防万一有人在这里绊倒,对 avro 模式做同样的事情并想将文本节点解析为 avro 数组。这里以 BOOLEAN 为例,将其替换为您需要的类型:
cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>)
因为可以使用 CAST 将数组转换为数组。这是一个内置查询示例。创建流并将字符串布尔数组转换为布尔数组。我的流中有 dtype 告诉数据类型,但它也可能包含非数组,所以我使用 instr,只将数组放入转换中:
create stream array_stream as SELECT rowkey, date, nodeid, dtype, cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>) as value_transformed FROM stream_defined_on_topic where instr(value, ']')>0 and dtype='Boolean';
现在可以在定义的流之上创建一个 table:
create table array_stream_table as select nodeid, collect_list(dtype), collect_list(array_join(value_transformed)) from array_stream window tumbling (size 1 seconds) group by nodeid;
希望这也能澄清上述 AVRO 模式的答案。我花了一段时间才弄明白。
TL;DR:在 ksqlDB 中,有没有办法将 TextNode
转换为 Array<VARCHAR>
,以便可以无误地执行 EXPLODE
?
全新的 ksqlDB 和 运行 遇到一个奇怪的问题。我正在使用 debezium -> ksqldb 进行 ETLing,数据正在流动,这很棒。问题是,当我使用 EXPLODE 函数时,它无法解析,因为我想成为一个 ARRAY 实际上是一个 TEXTNODE。这是一个来自 Postgres 的简化数据结构,其中 data
是 postgres 中的一个 JSONB:
{
"id": "b5b55e07-15d7-4559-8319-18a67205ea4d",
"data": [
"d728fef0-9eec-4dec-b9b6-04b5444431f6",
"7a475d25-ec73-41c3-9fbc-0a62e96d887a"
]
}
我的 debezium 连接器正在使用 KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
设置和输出(主题输出有我忽略的额外字段):
SET 'auto.offset.reset' = 'earliest';
CREATE SOURCE CONNECTOR forms_reader WITH (
'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
'database.hostname' = 'db',
'database.port' = '5432',
'database.user' = 'master',
'database.password' = 'secret',
'database.dbname' = 'forms',
'database.server.name' = 'forms',
'table.whitelist' = 'public.response_version',
'transforms' = 'unwrap',
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones' = 'false',
'transforms.unwrap.delete.handling.mode' = 'rewrite'
);
print 'forms.public.response_version' from beginning;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/12/17 21:42:29.879 Z, key: [Struct{id=b5b55e07-15d7-4559-8319-18a67@3616449210317300861/-], value: {"id":"b5b55e07-15d7-4559-8319-18a67205ea4d","response_id":"403fc75f-97fa-4f06-9f66-bebff6b458c7","data":"[\"d728fef0-9eec-4dec-b9b6-04b5444431f6\", \"7a475d25-ec73-41c3-9fbc-0a62e96d887a\"]","created_by":"b5166a61-71bb-4e50-b445-afcc64d46b5e","created_at":1608217959510550,"previous_response_version_id":null,"form_version_id":"6b4f9c86-4984-4d05-9e7e-8e51b97189b3","__deleted":"false"}
创建流:
CREATE STREAM response_versions(
id VARCHAR KEY,
data ARRAY<String>)
WITH(kafka_topic='forms.public.response_version', value_format='JSON');
CREATE STREAM response_fields
WITH(value_format='JSON')
AS SELECT id AS rv_id,
EXPLODE(data) AS data_field
FROM response_versions EMIT CHANGES;
创建第二个流的日志中出错:
org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: forms.public.response_version. Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
有人对此有解决方案吗?理想情况下,我想将 TextNode 放入数组中,但我不确定如何。
谢谢, 帕特里克
错误是由于 response_versions
流无法将字符串读入数组造成的。要进行转换,您必须创建该流以读取字符串,然后将字符串转换为数组。
为了转换 str -> arry,我不得不用 regexp_replace
删除括号,然后用 regexp_split_to_array
.
CREATE STREAM response_versions(
id VARCHAR KEY,
data STRING)
WITH(kafka_topic='forms.public.response_version', value_format='JSON');
CREATE STREAM response_fields
WITH(value_format='JSON')
AS SELECT
id AS rv_id,
EXPLODE(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(data, '\[|\]', ''), '\s*,\s*')) as AS data_field
FROM response_versions EMIT CHANGES;
以防万一有人在这里绊倒,对 avro 模式做同样的事情并想将文本节点解析为 avro 数组。这里以 BOOLEAN 为例,将其替换为您需要的类型:
cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>)
因为可以使用 CAST 将数组转换为数组。这是一个内置查询示例。创建流并将字符串布尔数组转换为布尔数组。我的流中有 dtype 告诉数据类型,但它也可能包含非数组,所以我使用 instr,只将数组放入转换中:
create stream array_stream as SELECT rowkey, date, nodeid, dtype, cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>) as value_transformed FROM stream_defined_on_topic where instr(value, ']')>0 and dtype='Boolean';
现在可以在定义的流之上创建一个 table:
create table array_stream_table as select nodeid, collect_list(dtype), collect_list(array_join(value_transformed)) from array_stream window tumbling (size 1 seconds) group by nodeid;
希望这也能澄清上述 AVRO 模式的答案。我花了一段时间才弄明白。