未找到 SparkSQL 密钥:规模

SparkSQL key not found: scale

Spark 版本为 1.6.0。

我尝试使用 Spark SQL 对远程 Oracle 11g 数据库进行简单的 SQL 查询。

当然把ojdbc驱动添加到classpath中,ping DB也ok了。

SparkConf conf = new SparkConf().setAppName(APP_NAME).setMaster("yarn-client");
JavaSparkContext jsc = new JavaSparkContext(conf);
SqlContext sqlContext = new SqlContext(jsc );

Map<String, String> connectionProperties = new HashMap<>();
connectionProperties.put("user", username);
connectionProperties.put("password", password);
connectionProperties.put("url", url);
connectionProperties.put("dbtable", "(SELECT * FROM tableName)");
connectionProperties.put("driver", "oracle.jdbc.OracleDriver");

DataFrame result = sqlContext.read().format("jdbc").options(connectionProperties).load();

错误出现在 .load() 方法的最后一行。

生成的堆栈跟踪是:

Exception in thread "main" java.util.NoSuchElementException: key not found: scale
    at scala.collection.MapLike$class.default(MapLike.scala:228)
    at scala.collection.AbstractMap.default(Map.scala:58)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:58)
    at org.apache.spark.sql.types.Metadata.get(Metadata.scala:108)
    at org.apache.spark.sql.types.Metadata.getLong(Metadata.scala:51)
    at org.apache.spark.sql.jdbc.OracleDialect$.getCatalystType(OracleDialect.scala:33)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:140)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:91)
    at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at myapp.dfComparator.entity.OriginalSourceTable.load(OriginalSourceTable.java:74)
    at myapp.dfComparator.Program.main(Program.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我没脑子,怎么了。

编辑 2017 年 1 月 27 日

附加信息:

Hadoop 版本为 2.6.0-cdh.5.8.3

Spark 版本为 1.6.0,Scala 版本为 2.10.5

我试图在 scala 中重现上面的代码并使用 spark-shell:

执行它
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:oracle:thin:system/system@db-host:1521:orcl", 
"dbtable"-> "schema_name.table_name", 
"driver"-> "oracle.jdbc.OracleDriver", 
"username" -> "user", 
"password" -> "pwd")).load()

此代码的结果是类似的堆栈跟踪:

java.util.NoSuchElementException: key not found: scale
    at scala.collection.MapLike$class.default(MapLike.scala:228)
    at scala.collection.AbstractMap.default(Map.scala:58)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:58)
    at org.apache.spark.sql.types.Metadata.get(Metadata.scala:108)
    at org.apache.spark.sql.types.Metadata.getLong(Metadata.scala:51)
    at org.apache.spark.sql.jdbc.OracleDialect$.getCatalystType(OracleDialect.scala:33)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:140)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:91)
    at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
    at $iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC.<init>(<console>:40)
    at $iwC.<init>(<console>:42)
    at <init>(<console>:44)
    at .<init>(<console>:48)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:821)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
    at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

因此,我强烈认为 spark 或(和)hadoop 的配置存在一些错误。

编辑 02/01/2017

我调查只有在oracle table 中的列类型是NUMBER 时才会出现这样的问题。例如,如果我在 select 语句中将 id 列(类型为 NUMBER)转换为 VARCHAR,则一切正常:

val jdbcDF = sqlContext.read.format("jdbc").options(
    Map("url" -> "jdbc:oracle:thin:system/system@db-host:1521:orcl", 
    "dbtable"-> "(SELECT CAST(id AS varchar(3))) FROM tableName", 
    "driver"-> "oracle.jdbc.OracleDriver", 
    "username" -> "user", 
    "password" -> "pwd")).load()

更详细-staktrace 告诉我们,问题出现在 org.apache.spark.sql.types.Metadata.get 方法中。通过研究 sources 中的此方法,我们可以看到(或假设)在 NUMBER 类型的情况下,它试图将其转换为 Long 并且无法找到它的比例。

这就是为什么现在我认为主要问题出在 apache spark 的 cloudera 发行版中。

此时我收到Cloudera的答复,在CDH 5.8.3 with Spark 1.6中这是一个已知问题,在Spark 2.0版本中得到解决。

为了克服这个问题,我们有 3 个选择:

  • 在 select 查询 CAST 任何数字 中键入 VARCHAR。代替 CAST我们也可以使用TO_CHAR函数。

  • 加载数据使用RDD/JavaRDD(而不是Dataframe)然后
    将其转换为 Dataframes(速度更快)

  • 使用 CDH 5.9.x