Spark DataFrame 嵌套结构是否有选择限制?
Is Spark DataFrame nested structure limited for selection?
我有一个包含一些数据的 json 文件,我可以从中创建 DataFrame,我感兴趣的特定部分的模式如下所示:
val json: DataFrame = sqlc.load("entities_with_address2.json", "json")
root
|-- attributes: struct (nullable = true)
| |-- Address2: array (nullable = true)
| | |-- value: struct (nullable = true)
| | | |-- Zip: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- value: struct (nullable = true)
| | | | | | |-- Zip5: array (nullable = true)
| | | | | | | |-- element: struct (containsNull = true)
| | | | | | | | |-- value: string (nullable = true)
当我尝试 select 最深的字段时:
json.select("attributes.Address2.value.Zip.value.Zip5").collect()
它给了我一个例外:
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value, StructType(StructField(Zip5, ArrayType(StructType(StructField(value, StringType, true)), true), true)), true)), true), true);
通过查看 LogicalPlan 的 resolveGetField 方法,我发现可以从 StructType 或 ArrayType(StructType) select,但是有什么方法可以更深入地 select 吗?我如何 select 我需要的字段?
这是完整的例外。
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true);
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun.apply(LogicalPlan.scala:214)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun.apply(LogicalPlan.scala:214)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:46)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:252)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:252)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp(QueryPlan.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$$anonfun$apply.apply(QueryPlan.scala:123)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:122)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:46)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:44)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:40)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:476)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:491)
at com.reltio.analytics.PREDF.test(PREDF.scala:55)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access[=11=]0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
问题是 ArrayType——您可以非常简单地重现此错误:
val df = Seq(Tuple1(Array[String]())).toDF("users")
此时 df.printSchema
显示:
root
|-- users: array (nullable = true)
| |-- element: string (containsNull = true)
现在如果你尝试:
df.select($"users.element")
你得到完全相同的异常 -- GetField is not valid...
您有几种不同的选择来放松 Array
。您可以像这样使用 getItem
获取单个项目:
df.select($"users".getItem(0))
并且由于 getItem
return 是另一个 Column
,你可以想挖多深就挖多深:
df.select($"attributes.Address2".getItem(0).getField("value").getField("Zip").getItem(...)
// etc
但是对于数组,您可能希望以编程方式展开整个数组。如果您查看 Hive 处理此问题的方式,您需要执行 LATERAL VIEW
。在 Spark 中,您将不得不使用 explode
来创建相当于 Hive LATERAL VIEW
:
case class User(name: String)
df.explode($"users"){ case Row(arr: Array[String]) => arr.map(User(_)) }
请注意,我在我的地图中使用了一个案例 Class——这是文档中的内容。如果你不想创建一个案例 class 你可以 return 一个 Tuple1
(或 Tuple2
或 Tuple3
等):
df.explode($"users"){ case Row(arr: Array[String]) => arr.map(Tuple1(_)) }
我有一个包含一些数据的 json 文件,我可以从中创建 DataFrame,我感兴趣的特定部分的模式如下所示:
val json: DataFrame = sqlc.load("entities_with_address2.json", "json")
root
|-- attributes: struct (nullable = true)
| |-- Address2: array (nullable = true)
| | |-- value: struct (nullable = true)
| | | |-- Zip: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- value: struct (nullable = true)
| | | | | | |-- Zip5: array (nullable = true)
| | | | | | | |-- element: struct (containsNull = true)
| | | | | | | | |-- value: string (nullable = true)
当我尝试 select 最深的字段时:
json.select("attributes.Address2.value.Zip.value.Zip5").collect()
它给了我一个例外:
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value, StructType(StructField(Zip5, ArrayType(StructType(StructField(value, StringType, true)), true), true)), true)), true), true);
通过查看 LogicalPlan 的 resolveGetField 方法,我发现可以从 StructType 或 ArrayType(StructType) select,但是有什么方法可以更深入地 select 吗?我如何 select 我需要的字段?
这是完整的例外。
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true);
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun.apply(LogicalPlan.scala:214)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun.apply(LogicalPlan.scala:214)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:46)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:252)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:252)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp(QueryPlan.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$$anonfun$apply.apply(QueryPlan.scala:123)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:122)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:46)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:44)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:40)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:476)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:491)
at com.reltio.analytics.PREDF.test(PREDF.scala:55)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access[=11=]0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
问题是 ArrayType——您可以非常简单地重现此错误:
val df = Seq(Tuple1(Array[String]())).toDF("users")
此时 df.printSchema
显示:
root
|-- users: array (nullable = true)
| |-- element: string (containsNull = true)
现在如果你尝试:
df.select($"users.element")
你得到完全相同的异常 -- GetField is not valid...
您有几种不同的选择来放松 Array
。您可以像这样使用 getItem
获取单个项目:
df.select($"users".getItem(0))
并且由于 getItem
return 是另一个 Column
,你可以想挖多深就挖多深:
df.select($"attributes.Address2".getItem(0).getField("value").getField("Zip").getItem(...)
// etc
但是对于数组,您可能希望以编程方式展开整个数组。如果您查看 Hive 处理此问题的方式,您需要执行 LATERAL VIEW
。在 Spark 中,您将不得不使用 explode
来创建相当于 Hive LATERAL VIEW
:
case class User(name: String)
df.explode($"users"){ case Row(arr: Array[String]) => arr.map(User(_)) }
请注意,我在我的地图中使用了一个案例 Class——这是文档中的内容。如果你不想创建一个案例 class 你可以 return 一个 Tuple1
(或 Tuple2
或 Tuple3
等):
df.explode($"users"){ case Row(arr: Array[String]) => arr.map(Tuple1(_)) }