Spark Hive:无法检索 DataFrame 的列
Spark Hive: Can't retrieve column of DataFrame
我正在尝试使用 Hive 上的 Spark。在代码中,我创建了一个新的 DataFrame
并使用 HiveContext.createDataFrame
方法用自定义数据填充它:
JavaSparkContext sc = ...;
HiveContext hiveCtx = new HiveContext(sc);
StructField f1 = new StructField("columnA", DataTypes.StringType, false, null);
StructField f2 = new StructField("columnB", DataTypes.StringType, false, null);
StructType st = new StructType(new StructField[] {f1, f2});
Row r1 = RowFactory.create("A", "B");
Row r2 = RowFactory.create("C", "D");
List<Row> allRows = new ArrayList<Row>();
allRows.add(r1);
allRows.add(r2);
DataFrame testDF = hiveCtx.createDataFrame(allRows, st);
testDF.explain(); // show the DF data
for(String col : testDF.columns()) { // list the columns, all seems to be ok here?!
System.out.println(col);
}
Column columnA = testDF.col("columnA"); // get the column --> exception!!!
...
当我通过 spark-submit
命令 运行 上面的代码时,我得到以下输出:
=== APP RUNNING ===
17/03/13 12:20:29 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/03/13 12:20:29 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/03/13 12:20:29 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/03/13 12:20:29 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/03/13 12:20:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:33 INFO metastore: Trying to connect to metastore with URI thrift://my-server-url:9083
17/03/13 12:20:33 INFO metastore: Connected to metastore.
== Physical Plan ==
LocalTableScan [columnA#0,columnB#1], [[A,B],[C,D]]
columnA
columnB
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:218)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
at scala.Tuple2.hashCode(Tuple2.scala:19)
at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:391)
at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
at scala.collection.SeqLike$$anonfun$distinct.apply(SeqLike.scala:494)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.SeqLike$class.distinct(SeqLike.scala:493)
at scala.collection.AbstractSeq.distinct(Seq.scala:40)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:191)
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
at temp.HiveTest.main(HiveTest.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
这是我的 spark-submit
电话:
spark-submit --class temp.HiveTest --master yarn --deploy-mode client /home/daniel/application.jar
为什么要给DataFrame.col(...)
打电话NullPointerException
??
尝试将 null
更改为 Metadata.empty()
:
StructField f1 = new StructField("columnA", DataTypes.StringType, false, Metadata.empty());
StructField f2 = new StructField("columnB", DataTypes.StringType, false, Metadata.empty());
我正在尝试使用 Hive 上的 Spark。在代码中,我创建了一个新的 DataFrame
并使用 HiveContext.createDataFrame
方法用自定义数据填充它:
JavaSparkContext sc = ...;
HiveContext hiveCtx = new HiveContext(sc);
StructField f1 = new StructField("columnA", DataTypes.StringType, false, null);
StructField f2 = new StructField("columnB", DataTypes.StringType, false, null);
StructType st = new StructType(new StructField[] {f1, f2});
Row r1 = RowFactory.create("A", "B");
Row r2 = RowFactory.create("C", "D");
List<Row> allRows = new ArrayList<Row>();
allRows.add(r1);
allRows.add(r2);
DataFrame testDF = hiveCtx.createDataFrame(allRows, st);
testDF.explain(); // show the DF data
for(String col : testDF.columns()) { // list the columns, all seems to be ok here?!
System.out.println(col);
}
Column columnA = testDF.col("columnA"); // get the column --> exception!!!
...
当我通过 spark-submit
命令 运行 上面的代码时,我得到以下输出:
=== APP RUNNING ===
17/03/13 12:20:29 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/03/13 12:20:29 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/03/13 12:20:29 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/03/13 12:20:29 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/03/13 12:20:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:33 INFO metastore: Trying to connect to metastore with URI thrift://my-server-url:9083
17/03/13 12:20:33 INFO metastore: Connected to metastore.
== Physical Plan ==
LocalTableScan [columnA#0,columnB#1], [[A,B],[C,D]]
columnA
columnB
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:218)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
at scala.Tuple2.hashCode(Tuple2.scala:19)
at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:391)
at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
at scala.collection.SeqLike$$anonfun$distinct.apply(SeqLike.scala:494)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.SeqLike$class.distinct(SeqLike.scala:493)
at scala.collection.AbstractSeq.distinct(Seq.scala:40)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:191)
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
at temp.HiveTest.main(HiveTest.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
这是我的 spark-submit
电话:
spark-submit --class temp.HiveTest --master yarn --deploy-mode client /home/daniel/application.jar
为什么要给DataFrame.col(...)
打电话NullPointerException
??
尝试将 null
更改为 Metadata.empty()
:
StructField f1 = new StructField("columnA", DataTypes.StringType, false, Metadata.empty());
StructField f2 = new StructField("columnB", DataTypes.StringType, false, Metadata.empty());