Spark Hive:无法检索 DataFrame 的列

Spark Hive: Can't retrieve column of DataFrame

我正在尝试使用 Hive 上的 Spark。在代码中,我创建了一个新的 DataFrame 并使用 HiveContext.createDataFrame 方法用自定义数据填充它:

JavaSparkContext sc = ...;
HiveContext hiveCtx = new HiveContext(sc);

StructField f1 = new StructField("columnA", DataTypes.StringType, false, null);
StructField f2 = new StructField("columnB", DataTypes.StringType, false, null);

StructType st = new StructType(new StructField[] {f1, f2});

Row r1 = RowFactory.create("A", "B");
Row r2 = RowFactory.create("C", "D");

List<Row> allRows = new ArrayList<Row>();
allRows.add(r1);
allRows.add(r2);

DataFrame testDF = hiveCtx.createDataFrame(allRows, st);

testDF.explain();                           // show the DF data

for(String col : testDF.columns()) {        // list the columns, all seems to be ok here?!
    System.out.println(col);
}

Column columnA = testDF.col("columnA"); // get the column --> exception!!!

...

当我通过 spark-submit 命令 运行 上面的代码时,我得到以下输出:

=== APP RUNNING ===
17/03/13 12:20:29 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/03/13 12:20:29 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/03/13 12:20:29 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/03/13 12:20:29 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/03/13 12:20:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/03/13 12:20:33 INFO metastore: Trying to connect to metastore with URI thrift://my-server-url:9083
17/03/13 12:20:33 INFO metastore: Connected to metastore.
== Physical Plan ==
LocalTableScan [columnA#0,columnB#1], [[A,B],[C,D]]
columnA
columnB
Exception in thread "main" java.lang.NullPointerException
        at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:218)
        at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
        at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
        at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
        at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
        at scala.Tuple2.hashCode(Tuple2.scala:19)
        at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:391)
        at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
        at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
        at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
        at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
        at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
        at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
        at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
        at scala.collection.SeqLike$$anonfun$distinct.apply(SeqLike.scala:494)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.SeqLike$class.distinct(SeqLike.scala:493)
        at scala.collection.AbstractSeq.distinct(Seq.scala:40)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:191)
        at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
        at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
        at temp.HiveTest.main(HiveTest.java:57)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

这是我的 spark-submit 电话:

spark-submit --class temp.HiveTest --master yarn --deploy-mode client /home/daniel/application.jar

为什么要给DataFrame.col(...)打电话NullPointerException??

尝试将 null 更改为 Metadata.empty():

StructField f1 = new StructField("columnA", DataTypes.StringType, false, Metadata.empty());
StructField f2 = new StructField("columnB", DataTypes.StringType, false, Metadata.empty());