使用 scala 将转换后的列附加到 spark dataframe
Append transformed columns to spark dataframe using scala
我正在尝试访问配置单元 table 并从 table/dataframe 中提取和转换某些列,然后将这些新列放入新的数据框中。
我正在尝试以这种方式进行 -
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val hiveDF = sqlContext.sql("select * from table_x")
val system_generated_id = hiveDF("unique_key")
val application_assigned_event_id = hiveDF("event_event_id")
val trnEventDf = sqlContext.emptyDataFrame
trnEventDf.withColumn("system_generated_id",lit(system_generated_id))
它使用 sbt 构建时没有任何错误。但是当我尝试 运行 它时,我收到以下错误 -
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$$anonfun$applyOrElse.apply(Analyzer.scala:354)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$$anonfun$applyOrElse.apply(Analyzer.scala:353)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply.applyOrElse(Analyzer.scala:353)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply.applyOrElse(Analyzer.scala:347)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:347)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:328)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$$anonfun$apply.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$$anonfun$apply.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:707)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1188)
at bacon$.main(bacon.scala:31)
at bacon.main(bacon.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
我想了解导致此错误的原因,以及是否有任何其他方法可以完成我正在尝试做的事情。
通常您不需要为此创建新的 df。当您通过向其添加唯一 Id 来转换 df 时,您将获得所需的 df。如果你想保存它,只需将它保存为一个新的配置单元 table。
我正在尝试访问配置单元 table 并从 table/dataframe 中提取和转换某些列,然后将这些新列放入新的数据框中。 我正在尝试以这种方式进行 -
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val hiveDF = sqlContext.sql("select * from table_x")
val system_generated_id = hiveDF("unique_key")
val application_assigned_event_id = hiveDF("event_event_id")
val trnEventDf = sqlContext.emptyDataFrame
trnEventDf.withColumn("system_generated_id",lit(system_generated_id))
它使用 sbt 构建时没有任何错误。但是当我尝试 运行 它时,我收到以下错误 -
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$$anonfun$applyOrElse.apply(Analyzer.scala:354) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$$anonfun$applyOrElse.apply(Analyzer.scala:353) at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply.applyOrElse(Analyzer.scala:353) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply.applyOrElse(Analyzer.scala:347) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:347) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:328) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$$anonfun$apply.apply(RuleExecutor.scala:83) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$$anonfun$apply.apply(RuleExecutor.scala:80) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute.apply(RuleExecutor.scala:80) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute.apply(RuleExecutor.scala:72) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) at org.apache.spark.sql.DataFrame.(DataFrame.scala:133) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:707) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1188) at bacon$.main(bacon.scala:31) at bacon.main(bacon.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
我想了解导致此错误的原因,以及是否有任何其他方法可以完成我正在尝试做的事情。
通常您不需要为此创建新的 df。当您通过向其添加唯一 Id 来转换 df 时,您将获得所需的 df。如果你想保存它,只需将它保存为一个新的配置单元 table。