KsqlDB 从 TextNode 到 Array 的转换

KsqlDB Converting from TextNode to Array

TL;DR:在 ksqlDB 中,有没有办法将 TextNode 转换为 Array<VARCHAR>,以便可以无误地执行 EXPLODE

全新的 ksqlDB 和 运行 遇到一个奇怪的问题。我正在使用 debezium -> ksqldb 进行 ETLing,数据正在流动,这很棒。问题是,当我使用 EXPLODE 函数时,它无法解析,因为我想成为一个 ARRAY 实际上是一个 TEXTNODE。这是一个来自 Postgres 的简化数据结构,其中 data 是 postgres 中的一个 JSONB:

{
  "id": "b5b55e07-15d7-4559-8319-18a67205ea4d",
  "data": [
        "d728fef0-9eec-4dec-b9b6-04b5444431f6",
        "7a475d25-ec73-41c3-9fbc-0a62e96d887a"
  ]
}

我的 debezium 连接器正在使用 KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"

设置和输出(主题输出有我忽略的额外字段):

SET 'auto.offset.reset' = 'earliest';

CREATE SOURCE CONNECTOR forms_reader WITH (
    'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
    'database.hostname' = 'db',
    'database.port' = '5432',
    'database.user' = 'master',
    'database.password' = 'secret',
    'database.dbname' = 'forms',
    'database.server.name' = 'forms',
    'table.whitelist' = 'public.response_version',
    'transforms' = 'unwrap',
    'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.unwrap.drop.tombstones' = 'false',
    'transforms.unwrap.delete.handling.mode' = 'rewrite'
);

print 'forms.public.response_version' from beginning;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/12/17 21:42:29.879 Z, key: [Struct{id=b5b55e07-15d7-4559-8319-18a67@3616449210317300861/-], value: {"id":"b5b55e07-15d7-4559-8319-18a67205ea4d","response_id":"403fc75f-97fa-4f06-9f66-bebff6b458c7","data":"[\"d728fef0-9eec-4dec-b9b6-04b5444431f6\", \"7a475d25-ec73-41c3-9fbc-0a62e96d887a\"]","created_by":"b5166a61-71bb-4e50-b445-afcc64d46b5e","created_at":1608217959510550,"previous_response_version_id":null,"form_version_id":"6b4f9c86-4984-4d05-9e7e-8e51b97189b3","__deleted":"false"}

创建流:

CREATE STREAM response_versions(
    id VARCHAR KEY, 
    data ARRAY<String>)
    WITH(kafka_topic='forms.public.response_version', value_format='JSON');

CREATE STREAM response_fields
    WITH(value_format='JSON')
    AS SELECT id AS rv_id,
           EXPLODE(data) AS data_field
        FROM response_versions EMIT CHANGES;

创建第二个流的日志中出错:

org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: forms.public.response_version. Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: ARRAY<VARCHAR>, path: $.DATA

有人对此有解决方案吗?理想情况下,我想将 TextNode 放入数组中,但我不确定如何。

谢谢, 帕特里克

错误是由于 response_versions 流无法将字符串读入数组造成的。要进行转换,您必须创建该流以读取字符串,然后将字符串转换为数组。

为了转换 str -> arry,我不得不用 regexp_replace 删除括号,然后用 regexp_split_to_array.

将字符串拆分为一个数组
CREATE STREAM response_versions(
    id VARCHAR KEY, 
    data STRING)
    WITH(kafka_topic='forms.public.response_version', value_format='JSON');

CREATE STREAM response_fields
    WITH(value_format='JSON')
    AS SELECT 
       id AS rv_id, 
       EXPLODE(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(data, '\[|\]', ''), '\s*,\s*')) as AS data_field
FROM response_versions EMIT CHANGES;  

以防万一有人在这里绊倒,对 avro 模式做同样的事情并想将文本节点解析为 avro 数组。这里以 BOOLEAN 为例,将其替换为您需要的类型:

cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>)

因为可以使用 CAST 将数组转换为数组。这是一个内置查询示例。创建流并将字符串布尔数组转换为布尔数组。我的流中有 dtype 告诉数据类型,但它也可能包含非数组,所以我使用 instr,只将数组放入转换中:

create stream array_stream as SELECT rowkey, date, nodeid, dtype, cast(REGEXP_SPLIT_TO_ARRAY(REGEXP_REPLACE(value, '[|]', ''), 's*,s*') as array<boolean>) as value_transformed FROM stream_defined_on_topic where instr(value, ']')>0 and dtype='Boolean';

现在可以在定义的流之上创建一个 table:

 create table array_stream_table as select nodeid, collect_list(dtype), collect_list(array_join(value_transformed)) from array_stream window tumbling (size 1 seconds) group by nodeid;

希望这也能澄清上述 AVRO 模式的答案。我花了一段时间才弄明白。