使用 Spark 将 json 映射到大小写 class(字段名称中的空格)
Mapping json to case class with Spark (spaces in the field name)
我正在尝试使用 spark Dataset
API 读取一个 json 文件,问题是这个 json 在某些地方包含 space字段名称。
这将是 json 行
{"Field Name" : "value"}
我的情况class需要这样
case class MyType(`Field Name`: String)
然后我可以将文件加载到 DataFrame
中,它将加载正确的架构
val dataframe = spark.read.json(path)
当我尝试将 DataFrame
转换为 Dataset[MyType]
时出现问题
dataframe.as[MyType]
Encoder[MyType]
加载的 StructSchema 是错误的,它引入了 $u0020
而不是 space,我得到以下错误
cannot resolve '`Field$u0020Name`' given input columns: [Field Name];
org.apache.spark.sql.AnalysisException: cannot resolve '`Field$u0020Name`' given input columns: [Field Name];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:88)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:256)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:206)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
at org.apache.spark.sql.Dataset.as(Dataset.scala:380)
at com.radius.floodgate.preprocess.BomboraSuite$$anonfun.apply$mcV$sp(BomboraSuite.scala:151)
at com.radius.floodgate.preprocess.BomboraSuite$$anonfun.apply(BomboraSuite.scala:141)
at com.radius.floodgate.preprocess.BomboraSuite$$anonfun.apply(BomboraSuite.scala:141)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon.apply(FunSuiteLike.scala:186)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$class.invokeWithFixture(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$runTests.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests.apply(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at com.radius.floodgate.preprocess.BomboraSuite.org$scalatest$BeforeAndAfterAll$$super$run(BomboraSuite.scala:18)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at com.radius.floodgate.preprocess.BomboraSuite.run(BomboraSuite.scala:18)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
是否有解决此问题的方法?
解决方法是创建一个不带 space(下划线)的列名并重命名 DF 列以匹配大小写 class 列名。
case class MyType(Field_Name: String)
dataframe.withColumnRenamed("Field Name", "Field_Name").as[MyType]
仅供参考,我已经创建了一个关于此问题的 Spark 错误,他们正在修复它 https://issues.apache.org/jira/browse/SPARK-22442
我正在尝试使用 spark Dataset
API 读取一个 json 文件,问题是这个 json 在某些地方包含 space字段名称。
这将是 json 行
{"Field Name" : "value"}
我的情况class需要这样
case class MyType(`Field Name`: String)
然后我可以将文件加载到 DataFrame
中,它将加载正确的架构
val dataframe = spark.read.json(path)
当我尝试将 DataFrame
转换为 Dataset[MyType]
dataframe.as[MyType]
Encoder[MyType]
加载的 StructSchema 是错误的,它引入了 $u0020
而不是 space,我得到以下错误
cannot resolve '`Field$u0020Name`' given input columns: [Field Name];
org.apache.spark.sql.AnalysisException: cannot resolve '`Field$u0020Name`' given input columns: [Field Name];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:88)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:256)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:206)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
at org.apache.spark.sql.Dataset.as(Dataset.scala:380)
at com.radius.floodgate.preprocess.BomboraSuite$$anonfun.apply$mcV$sp(BomboraSuite.scala:151)
at com.radius.floodgate.preprocess.BomboraSuite$$anonfun.apply(BomboraSuite.scala:141)
at com.radius.floodgate.preprocess.BomboraSuite$$anonfun.apply(BomboraSuite.scala:141)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon.apply(FunSuiteLike.scala:186)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$class.invokeWithFixture(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$runTests.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests.apply(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at com.radius.floodgate.preprocess.BomboraSuite.org$scalatest$BeforeAndAfterAll$$super$run(BomboraSuite.scala:18)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at com.radius.floodgate.preprocess.BomboraSuite.run(BomboraSuite.scala:18)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
是否有解决此问题的方法?
解决方法是创建一个不带 space(下划线)的列名并重命名 DF 列以匹配大小写 class 列名。
case class MyType(Field_Name: String)
dataframe.withColumnRenamed("Field Name", "Field_Name").as[MyType]
仅供参考,我已经创建了一个关于此问题的 Spark 错误,他们正在修复它 https://issues.apache.org/jira/browse/SPARK-22442