在 KSQLDB 中插入具有嵌套结构的数组
Inserting arrays with nested structure in KSQLDB
我正在尝试将嵌套数组对象插入到 KSQL table。我的table结构如下:
CREATE TABLE nlpArticlesTrain ("articleText" VARCHAR,
"ner" ARRAY<STRUCT<"text" VARCHAR, "label" VARCHAR>>,
"rel" ARRAY<STRUCT<"head" VARCHAR, "tail" VARCHAR, "rel" VARCHAR, "prob" DOUBLE>>)
WITH (KAFKA_TOPIC = 'nlpArticlesTrain', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='AVRO');
我知道我可以使用支持 Avro 的标准 Kafka 生产者进行推送,但我正在寻找一种方法将 INSER INTO VALUES
转换为 table,以便填充下面的 stream/topic。在 documentation of query with structured data 中,我缺少一个插入示例。
INSERT INTO nlpArticlesTrain (articleText,ner,rel) VALUES ("string", [{..}],[{..}])
不起作用。
更具体的例子:
INSERT INTO nlpArticlesTrain (articleText,ner,rel) VALUES ("some", [{'text': 'The Mexican Ministry of Health', 'label': 'ORG'}, {'text': 'Tuesday', 'label': 'DATE'}, {'text': 'at least 29', 'label': 'CARDINAL'}], [{'head': 'The Mexican Ministry of Health', 'tail': 'Tuesday', 'rel': 'subsidiary', 'prob': 0.3873162269592285}])
这取决于您使用的 ksqlDB 版本。更高版本支持 ARRAY
和 STRUCT
构造函数,这正是您所需要的。例如:
CREATE STREAM TEST (K STRING KEY, A ARRAY<STRUCT<FOO INT>>)
WITH (kafka_topic='test_topic', value_format='JSON');
INSERT INTO TEST (A) VALUES (ARRAY[
STRUCT(FOO := 1),
STRUCT(FOO := 2)
]);
我正在尝试将嵌套数组对象插入到 KSQL table。我的table结构如下:
CREATE TABLE nlpArticlesTrain ("articleText" VARCHAR,
"ner" ARRAY<STRUCT<"text" VARCHAR, "label" VARCHAR>>,
"rel" ARRAY<STRUCT<"head" VARCHAR, "tail" VARCHAR, "rel" VARCHAR, "prob" DOUBLE>>)
WITH (KAFKA_TOPIC = 'nlpArticlesTrain', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='AVRO');
我知道我可以使用支持 Avro 的标准 Kafka 生产者进行推送,但我正在寻找一种方法将 INSER INTO VALUES
转换为 table,以便填充下面的 stream/topic。在 documentation of query with structured data 中,我缺少一个插入示例。
INSERT INTO nlpArticlesTrain (articleText,ner,rel) VALUES ("string", [{..}],[{..}])
不起作用。
更具体的例子:
INSERT INTO nlpArticlesTrain (articleText,ner,rel) VALUES ("some", [{'text': 'The Mexican Ministry of Health', 'label': 'ORG'}, {'text': 'Tuesday', 'label': 'DATE'}, {'text': 'at least 29', 'label': 'CARDINAL'}], [{'head': 'The Mexican Ministry of Health', 'tail': 'Tuesday', 'rel': 'subsidiary', 'prob': 0.3873162269592285}])
这取决于您使用的 ksqlDB 版本。更高版本支持 ARRAY
和 STRUCT
构造函数,这正是您所需要的。例如:
CREATE STREAM TEST (K STRING KEY, A ARRAY<STRUCT<FOO INT>>)
WITH (kafka_topic='test_topic', value_format='JSON');
INSERT INTO TEST (A) VALUES (ARRAY[
STRUCT(FOO := 1),
STRUCT(FOO := 2)
]);