Spark Structured Streaming 读取嵌套的 Kafka Connect jsonConverter 消息
Spark Structured Streaming to read nested Kafka Connect jsonConverter message
我已经使用 KafkaConnect 文件脉冲连接器 1.5.3 摄取了 xml 文件
然后我想用Spark Streaming读到parse/flatten它。因为它是相当嵌套的。
字符串 我读出kafka(我用消费者控制台读出这个,并在payload
之前放一个Enter/new行插图)如下所示:
{
"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"city"},{"type":"array","items":{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"unit"},{"type":"string","optional":true,"field":"value"}],"optional":true,"name":"Value"},"optional":true,"field":"value"}],"optional":true,"name":"ForcedArrayType"},"optional":true,"field":"forcedArrayField"},{"type":"string","optional":true,"field":"lastField"}],"optional":true,"name":"Data","field":"data"}],"optional":true}
,"payload":{"data":{"city":"someCity","forcedArrayField":[{"value":[{"unit":"unitField1","value":"123"},{"unit":"unitField1","value":"456"}]}],"lastField":"2020-08-02T18:02:00"}}
}
数据类型 我尝试了:
StructType schema = new StructType();
schema = schema.add( "schema", StringType, false);
schema = schema.add( "payload", StringType, false);
StructType Data = new StructType();
StructType ValueArray = new StructType(new StructField[]{
new StructField("unit", StringType,true,Metadata.empty()),
new StructField("value", StringType,true,Metadata.empty())
});
StructType ForcedArrayType = new StructType(new StructField[]{
new StructField("valueArray", ValueArray,true,Metadata.empty())
});
Data = Data.add("city",StringType,true);
Data = Data.add("forcedArrayField",ForcedArrayType,true);
Data = Data.add("lastField",StringType,true);
StructType Record = new StructType();
Record = Record.add("data", Data, false);
查询 我尝试了:
//below worked for payload
Dataset<Row> parsePayload = lines
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"), schema=schema).as("schemaAndPayload"))
.select("schemaAndPayload.payload").as("payload");
System.out.println(parsePayload.isStreaming());
//below makes the output empty:
Dataset<Row> parseValue = parsePayload.select(functions.from_json(functions.col("payload"), Record).as("cols"))
.select(functions.col("cols.data.city"));
//.select(functions.col("cols.*"));
StreamingQuery query = parseValue
.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
query.awaitTermination();
当我输出 parsePayload 流时,我可以看到数据(仍然是 json 结构),但是当我想要 select certain/all 字段时,就像上面的城市一样。它是空的。
需要帮助
原因数据类型是否定义错误?或者查询有误?
Ps。
在控制台上,当我尝试输出 'parsePayload' 而不是 'parseValue' 时,它显示了一些数据,这让我认为 'payload' 部分有效。
|{"data":{"city":"...|
...
您的架构定义有误。
payload
和 schema
可能不是 column/field
将其作为静态 Json (Spark.read.json) 读取并获取模式,然后在结构化流中使用它。
您的架构定义似乎不完全正确。我正在复制您的问题,并且能够使用以下架构
解析 JSON
val payloadSchema = new StructType()
.add("data", new StructType()
.add("city", StringType)
.add("forcedArrayField", ArrayType(new StructType()
.add("value", ArrayType(new StructType()
.add("unit", StringType)
.add("value", StringType)))))
.add("lastField", StringType))
当我随后访问各个字段时,我使用了以下选择:
val parsePayload = df
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"), schema).as("schemaAndPayload"))
.select("schemaAndPayload.payload").as("payload")
.select(functions.from_json(functions.col("payload"), payloadSchema).as("cols"))
.select(col("cols.data.city").as("city"), explode(col("cols.data.forcedArrayField")).as("forcedArrayField"), col("cols.data.lastField").as("lastField"))
.select(col("city"), explode(col("forcedArrayField.value").as("middleFields")), col("lastField"))
这给出了输出
+--------+-----------------+-------------------+
| city| col| lastField|
+--------+-----------------+-------------------+
|someCity|[unitField1, 123]|2020-08-02T18:02:00|
|someCity|[unitField1, 456]|2020-08-02T18:02:00|
+--------+-----------------+-------------------+
我已经使用 KafkaConnect 文件脉冲连接器 1.5.3 摄取了 xml 文件 然后我想用Spark Streaming读到parse/flatten它。因为它是相当嵌套的。
字符串 我读出kafka(我用消费者控制台读出这个,并在payload
之前放一个Enter/new行插图)如下所示:
{
"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"city"},{"type":"array","items":{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"unit"},{"type":"string","optional":true,"field":"value"}],"optional":true,"name":"Value"},"optional":true,"field":"value"}],"optional":true,"name":"ForcedArrayType"},"optional":true,"field":"forcedArrayField"},{"type":"string","optional":true,"field":"lastField"}],"optional":true,"name":"Data","field":"data"}],"optional":true}
,"payload":{"data":{"city":"someCity","forcedArrayField":[{"value":[{"unit":"unitField1","value":"123"},{"unit":"unitField1","value":"456"}]}],"lastField":"2020-08-02T18:02:00"}}
}
数据类型 我尝试了:
StructType schema = new StructType();
schema = schema.add( "schema", StringType, false);
schema = schema.add( "payload", StringType, false);
StructType Data = new StructType();
StructType ValueArray = new StructType(new StructField[]{
new StructField("unit", StringType,true,Metadata.empty()),
new StructField("value", StringType,true,Metadata.empty())
});
StructType ForcedArrayType = new StructType(new StructField[]{
new StructField("valueArray", ValueArray,true,Metadata.empty())
});
Data = Data.add("city",StringType,true);
Data = Data.add("forcedArrayField",ForcedArrayType,true);
Data = Data.add("lastField",StringType,true);
StructType Record = new StructType();
Record = Record.add("data", Data, false);
查询 我尝试了:
//below worked for payload
Dataset<Row> parsePayload = lines
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"), schema=schema).as("schemaAndPayload"))
.select("schemaAndPayload.payload").as("payload");
System.out.println(parsePayload.isStreaming());
//below makes the output empty:
Dataset<Row> parseValue = parsePayload.select(functions.from_json(functions.col("payload"), Record).as("cols"))
.select(functions.col("cols.data.city"));
//.select(functions.col("cols.*"));
StreamingQuery query = parseValue
.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
query.awaitTermination();
当我输出 parsePayload 流时,我可以看到数据(仍然是 json 结构),但是当我想要 select certain/all 字段时,就像上面的城市一样。它是空的。
需要帮助 原因数据类型是否定义错误?或者查询有误?
Ps。 在控制台上,当我尝试输出 'parsePayload' 而不是 'parseValue' 时,它显示了一些数据,这让我认为 'payload' 部分有效。
|{"data":{"city":"...|
...
您的架构定义有误。
payload
和 schema
可能不是 column/field
将其作为静态 Json (Spark.read.json) 读取并获取模式,然后在结构化流中使用它。
您的架构定义似乎不完全正确。我正在复制您的问题,并且能够使用以下架构
解析 JSONval payloadSchema = new StructType()
.add("data", new StructType()
.add("city", StringType)
.add("forcedArrayField", ArrayType(new StructType()
.add("value", ArrayType(new StructType()
.add("unit", StringType)
.add("value", StringType)))))
.add("lastField", StringType))
当我随后访问各个字段时,我使用了以下选择:
val parsePayload = df
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"), schema).as("schemaAndPayload"))
.select("schemaAndPayload.payload").as("payload")
.select(functions.from_json(functions.col("payload"), payloadSchema).as("cols"))
.select(col("cols.data.city").as("city"), explode(col("cols.data.forcedArrayField")).as("forcedArrayField"), col("cols.data.lastField").as("lastField"))
.select(col("city"), explode(col("forcedArrayField.value").as("middleFields")), col("lastField"))
这给出了输出
+--------+-----------------+-------------------+
| city| col| lastField|
+--------+-----------------+-------------------+
|someCity|[unitField1, 123]|2020-08-02T18:02:00|
|someCity|[unitField1, 456]|2020-08-02T18:02:00|
+--------+-----------------+-------------------+