使用火花流解析事件中心复杂数组类型消息
Parsing Event Hub Complex Array Type messages using spark streaming
我需要在读取 eventhub 时解析正文中的数组类型。我们嵌套了 json 消息但无法解析相同的消息:
{"姓名": "Rohit","薪水": "29292","EmpID": 12,"项目": [{"项目ID": "9191","项目名称": "abc", "Duration": "79"},{"ProjectID": "9192","ProjectName": "xyz","Duration": "75"}]}
我正在尝试使用以下方法修改模式:但似乎存在一些问题。
val testSchema = new StructType()
.add("Name", StringType)
.add("Salary", StringType)
.add("EmpID", StringType)
.add("Projects", new ArrayType(new StructType()
.add("ProjectID", StringType)
.add("ProjectName", StringType)
.add("Duration", StringType)))
任何帮助将不胜感激。
很难说没有确切的错误,但看起来您在模式定义中有错误 - 您需要修改模式以添加指定数组元素是否可为空的标志(请参阅 true
嵌套 StructType 之后的标志)。当直接读取 JSON 或从字符串转换它时,以下模式工作正常:
val testSchema = new StructType()
.add("Name", StringType)
.add("Salary", StringType)
.add("EmpID", StringType)
.add("Projects", new ArrayType(new StructType()
.add("ProjectID", StringType)
.add("ProjectName", StringType)
.add("Duration", StringType), true))
scala> spark.read.schema(testSchema).json("file.json").show(truncate=false)
+-----+------+-----+----------------------------------+
|Name |Salary|EmpID|Projects |
+-----+------+-----+----------------------------------+
|Rohit|29292 |12 |[{9191, abc, 79}, {9192, xyz, 75}]|
+-----+------+-----+----------------------------------+
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val df = spark.read.text("file.json")
df: org.apache.spark.sql.DataFrame = [value: string]
scala> df.select(from_json($"value", testSchema)).show(truncate=false)
+------------------------------------------------------+
|from_json(value) |
+------------------------------------------------------+
|{Rohit, 29292, 12, [{9191, abc, 79}, {9192, xyz, 75}]}|
+------------------------------------------------------+
我需要在读取 eventhub 时解析正文中的数组类型。我们嵌套了 json 消息但无法解析相同的消息:
{"姓名": "Rohit","薪水": "29292","EmpID": 12,"项目": [{"项目ID": "9191","项目名称": "abc", "Duration": "79"},{"ProjectID": "9192","ProjectName": "xyz","Duration": "75"}]}
我正在尝试使用以下方法修改模式:但似乎存在一些问题。
val testSchema = new StructType()
.add("Name", StringType)
.add("Salary", StringType)
.add("EmpID", StringType)
.add("Projects", new ArrayType(new StructType()
.add("ProjectID", StringType)
.add("ProjectName", StringType)
.add("Duration", StringType)))
任何帮助将不胜感激。
很难说没有确切的错误,但看起来您在模式定义中有错误 - 您需要修改模式以添加指定数组元素是否可为空的标志(请参阅 true
嵌套 StructType 之后的标志)。当直接读取 JSON 或从字符串转换它时,以下模式工作正常:
val testSchema = new StructType()
.add("Name", StringType)
.add("Salary", StringType)
.add("EmpID", StringType)
.add("Projects", new ArrayType(new StructType()
.add("ProjectID", StringType)
.add("ProjectName", StringType)
.add("Duration", StringType), true))
scala> spark.read.schema(testSchema).json("file.json").show(truncate=false)
+-----+------+-----+----------------------------------+
|Name |Salary|EmpID|Projects |
+-----+------+-----+----------------------------------+
|Rohit|29292 |12 |[{9191, abc, 79}, {9192, xyz, 75}]|
+-----+------+-----+----------------------------------+
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val df = spark.read.text("file.json")
df: org.apache.spark.sql.DataFrame = [value: string]
scala> df.select(from_json($"value", testSchema)).show(truncate=false)
+------------------------------------------------------+
|from_json(value) |
+------------------------------------------------------+
|{Rohit, 29292, 12, [{9191, abc, 79}, {9192, xyz, 75}]}|
+------------------------------------------------------+