JSON 数组到 JSON 对象使用 KSTREAM 或 KSQL
JSON Array to JSON object using KSTREAM or KSQL
我有以下格式的数据进入 Kafka。
{"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}
我要这样转换
{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}
我尝试使用 ksql 进行展平,但 ksql 尚不支持数组。
我尝试使用下面的代码 kstream
来展平。
builder.stream(inputTopic).flatMapValues(Object -> Arrays.asList()).to(outputTopic);
但它没有产生任何输出。如有任何帮助,我们将不胜感激。
Arrays.asList() 仅创建空列表。
添加实际逻辑以从输入获取数组并将其转换为实现 Iterable 的集合(例如 ArrayList),这里我尝试将 flatMapValues 与 Jackson 一起使用:
builder.stream(inputTopic).flatMapValues((ValueMapper<JsonNode, Iterable<JsonNode>>) value -> {
ArrayNode arrayNode = (ArrayNode) value.get("WHS");
return arrayNode::iterator;
});
KSQL / ksqlDB 支持数组。以下是如何使用它完成您的要求:
-- Declare the stream
CREATE STREAM TEST1
(WHS ARRAY<STRUCT<"action" VARCHAR
, "Update-Date-Time" VARCHAR
, "Number" VARCHAR
, "Abbr" VARCHAR
, "Name" VARCHAR
, "Name2" VARCHAR
, "Country-Code" VARCHAR
, "Addr-1" VARCHAR
, "Addr-2" VARCHAR
, "Addr-4" VARCHAR
, "City" VARCHAR
, "State" VARCHAR>>)
WITH (KAFKA_TOPIC ='test1'
,VALUE_FORMAT='JSON');
-- Set querying from beginning of the topic
SET 'auto.offset.reset' = 'earliest';
-- Query the array
ksql> SELECT WHS FROM TEST1 EMIT CHANGES LIMIT 1;
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|WHS |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}] |
Limit Reached
Query terminated
ksql>
-- Flatten the array
ksql> SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES LIMIT 1;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|KSQL_COL_0 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=} |
Limit Reached
Query terminated
ksql>
您可以将其写入另一个流(主题):
ksql> CREATE STREAM TEST1_EXPLODE WITH (KAFKA_TOPIC='NEW_TEST1') AS SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES;
Message
-------------------------------------------
Created query with ID CSAS_TEST1_EXPLODE_155
-------------------------------------------
ksql> PRINT NEW_TEST1;
…
Value format: JSON or KAFKA_STRING
rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"KSQL_COL_0":{"ACTION":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","NUMBER":"0","ABBR":"","NAME":"","NAME2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","STATE":""}}
如果您想展平生成的结构,您也可以这样做:
CREATE STREAM TEST1_FLATTENED AS SELECT EXPLODE(WHS)->"action" AS "action" ,
EXPLODE(WHS)->"Update-Date-Time" AS "Update-Date-Time" ,
EXPLODE(WHS)->"Number" AS "Number" ,
EXPLODE(WHS)->"Abbr" AS "Abbr" ,
EXPLODE(WHS)->"Name" AS "Name" ,
EXPLODE(WHS)->"Name2" AS "Name2" ,
EXPLODE(WHS)->"Country-Code" AS "Country-Code" ,
EXPLODE(WHS)->"Addr-1" AS "Addr-1" ,
EXPLODE(WHS)->"Addr-2" AS "Addr-2" ,
EXPLODE(WHS)->"Addr-4" AS "Addr-4" ,
EXPLODE(WHS)->"City" AS "City" ,
EXPLODE(WHS)->"State" AS "State"
FROM TEST1 EMIT CHANGES;
ksql> PRINT TEST1_FLATTENED;
…
Value format: JSON or KAFKA_STRING
rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":"0","Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","State":""}
我有以下格式的数据进入 Kafka。
{"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}
我要这样转换
{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}
我尝试使用 ksql 进行展平,但 ksql 尚不支持数组。
我尝试使用下面的代码 kstream
来展平。
builder.stream(inputTopic).flatMapValues(Object -> Arrays.asList()).to(outputTopic);
但它没有产生任何输出。如有任何帮助,我们将不胜感激。
Arrays.asList() 仅创建空列表。
添加实际逻辑以从输入获取数组并将其转换为实现 Iterable 的集合(例如 ArrayList),这里我尝试将 flatMapValues 与 Jackson 一起使用:
builder.stream(inputTopic).flatMapValues((ValueMapper<JsonNode, Iterable<JsonNode>>) value -> {
ArrayNode arrayNode = (ArrayNode) value.get("WHS");
return arrayNode::iterator;
});
KSQL / ksqlDB 支持数组。以下是如何使用它完成您的要求:
-- Declare the stream
CREATE STREAM TEST1
(WHS ARRAY<STRUCT<"action" VARCHAR
, "Update-Date-Time" VARCHAR
, "Number" VARCHAR
, "Abbr" VARCHAR
, "Name" VARCHAR
, "Name2" VARCHAR
, "Country-Code" VARCHAR
, "Addr-1" VARCHAR
, "Addr-2" VARCHAR
, "Addr-4" VARCHAR
, "City" VARCHAR
, "State" VARCHAR>>)
WITH (KAFKA_TOPIC ='test1'
,VALUE_FORMAT='JSON');
-- Set querying from beginning of the topic
SET 'auto.offset.reset' = 'earliest';
-- Query the array
ksql> SELECT WHS FROM TEST1 EMIT CHANGES LIMIT 1;
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|WHS |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}] |
Limit Reached
Query terminated
ksql>
-- Flatten the array
ksql> SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES LIMIT 1;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|KSQL_COL_0 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=} |
Limit Reached
Query terminated
ksql>
您可以将其写入另一个流(主题):
ksql> CREATE STREAM TEST1_EXPLODE WITH (KAFKA_TOPIC='NEW_TEST1') AS SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES;
Message
-------------------------------------------
Created query with ID CSAS_TEST1_EXPLODE_155
-------------------------------------------
ksql> PRINT NEW_TEST1;
…
Value format: JSON or KAFKA_STRING
rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"KSQL_COL_0":{"ACTION":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","NUMBER":"0","ABBR":"","NAME":"","NAME2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","STATE":""}}
如果您想展平生成的结构,您也可以这样做:
CREATE STREAM TEST1_FLATTENED AS SELECT EXPLODE(WHS)->"action" AS "action" ,
EXPLODE(WHS)->"Update-Date-Time" AS "Update-Date-Time" ,
EXPLODE(WHS)->"Number" AS "Number" ,
EXPLODE(WHS)->"Abbr" AS "Abbr" ,
EXPLODE(WHS)->"Name" AS "Name" ,
EXPLODE(WHS)->"Name2" AS "Name2" ,
EXPLODE(WHS)->"Country-Code" AS "Country-Code" ,
EXPLODE(WHS)->"Addr-1" AS "Addr-1" ,
EXPLODE(WHS)->"Addr-2" AS "Addr-2" ,
EXPLODE(WHS)->"Addr-4" AS "Addr-4" ,
EXPLODE(WHS)->"City" AS "City" ,
EXPLODE(WHS)->"State" AS "State"
FROM TEST1 EMIT CHANGES;
ksql> PRINT TEST1_FLATTENED;
…
Value format: JSON or KAFKA_STRING
rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":"0","Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","State":""}