在 KSQLDB 中插入具有嵌套结构的数组

Inserting arrays with nested structure in KSQLDB

我正在尝试将嵌套数组对象插入到 KSQL table。我的table结构如下:

CREATE TABLE nlpArticlesTrain ("articleText" VARCHAR,
  "ner" ARRAY<STRUCT<"text" VARCHAR, "label" VARCHAR>>,
  "rel" ARRAY<STRUCT<"head" VARCHAR, "tail" VARCHAR, "rel" VARCHAR, "prob" DOUBLE>>)
  WITH (KAFKA_TOPIC = 'nlpArticlesTrain', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='AVRO');

我知道我可以使用支持 Avro 的标准 Kafka 生产者进行推送,但我正在寻找一种方法将 INSER INTO VALUES 转换为 table,以便填充下面的 stream/topic。在 documentation of query with structured data 中,我缺少一个插入示例。

INSERT INTO nlpArticlesTrain (articleText,ner,rel) VALUES ("string", [{..}],[{..}]) 不起作用。

更具体的例子:

INSERT INTO nlpArticlesTrain (articleText,ner,rel) VALUES ("some", [{'text': 'The Mexican Ministry of Health', 'label': 'ORG'}, {'text': 'Tuesday', 'label': 'DATE'}, {'text': 'at least 29', 'label': 'CARDINAL'}], [{'head': 'The Mexican Ministry of Health', 'tail': 'Tuesday', 'rel': 'subsidiary', 'prob': 0.3873162269592285}])

这取决于您使用的 ksqlDB 版本。更高版本支持 ARRAYSTRUCT 构造函数,这正是您所需要的。例如:

CREATE STREAM TEST (K STRING KEY, A ARRAY<STRUCT<FOO INT>>) 
    WITH (kafka_topic='test_topic', value_format='JSON');

INSERT INTO TEST (A) VALUES (ARRAY[
   STRUCT(FOO := 1),
   STRUCT(FOO := 2)
]);