Spark SQL: 嵌套 类 到 parquet 错误
Spark SQL: Nested classes to parquet error
我似乎无法写信给 parquet a JavaRDD<T>
,其中 T 是一个 say,Person
class。我将其定义为
public class Person implements Serializable
{
private static final long serialVersionUID = 1L;
private String name;
private String age;
private Address address;
....
与 Address
:
public class Address implements Serializable
{
private static final long serialVersionUID = 1L;
private String City; private String Block;
...<getters and setters>
然后我创建一个 JavaRDD
像这样:
JavaRDD<Person> people = sc.textFile("/user/johndoe/spark/data/people.txt").map(new Function<String, Person>()
{
public Person call(String line)
{
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge("2");
Address address = new Address("HomeAdd","141H");
person.setAddress(address);
return person;
}
});
注意 - 我手动设置 Address
对所有人都一样。这基本上是一个嵌套的 RDD。在尝试将其保存为镶木地板文件时:
DataFrame dfschemaPeople = sqlContext.createDataFrame(people, Person.class);
dfschemaPeople.write().parquet("/user/johndoe/spark/data/out/people.parquet");
地址 class 是:
import java.io.Serializable;
public class Address implements Serializable
{
public Address(String city, String block)
{
super();
City = city;
Block = block;
}
private static final long serialVersionUID = 1L;
private String City;
private String Block;
//Omitting getters and setters
}
我遇到错误:
原因:java.lang.ClassCastException:com.test.schema.Address 无法转换为 org.apache.spark.sql.Row
我是 运行 spark-1.4.1。
- 这是已知错误吗?
- 如果我通过导入相同格式的嵌套 JSON 文件来做同样的事情,我就可以保存到 parquet。
- 即使我创建了一个像这样的子 DataFrame:
DataFrame dfSubset = sqlContext.sql("SELECT address.city FROM PersonTable");
我仍然得到同样的错误
那么是什么原因呢?如何从文本文件中读取复杂的数据结构并另存为 parquet?看来我做不到。
您使用的 java api 有限制
来自 spark 文档:
http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#interoperating-with-rdds
Spark SQL 支持自动将 JavaBeans 的 RDD 转换为 DataFrame。使用反射获得的 BeanInfo 定义了 table 的模式。目前,Spark SQL 不支持包含嵌套或复杂类型(如列表或数组)的 JavaBeans。您可以通过创建一个 class 来创建一个 JavaBean,该 class 实现了 Serializable 并为其所有字段提供了 getter 和 setter。
使用 scala case classes 它将工作(更新为写入 parquet 格式)
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
case class Address(city:String, block:String);
case class Person(name:String,age:String, address:Address);
object Test2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext.implicits._
val people = sc.parallelize(List(Person("a", "b", Address("a", "b")), Person("c", "d", Address("c", "d"))));
val df = sqlContext.createDataFrame(people);
df.write.mode("overwrite").parquet("/tmp/people.parquet")
}
}
我似乎无法写信给 parquet a JavaRDD<T>
,其中 T 是一个 say,Person
class。我将其定义为
public class Person implements Serializable
{
private static final long serialVersionUID = 1L;
private String name;
private String age;
private Address address;
....
与 Address
:
public class Address implements Serializable
{
private static final long serialVersionUID = 1L;
private String City; private String Block;
...<getters and setters>
然后我创建一个 JavaRDD
像这样:
JavaRDD<Person> people = sc.textFile("/user/johndoe/spark/data/people.txt").map(new Function<String, Person>()
{
public Person call(String line)
{
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge("2");
Address address = new Address("HomeAdd","141H");
person.setAddress(address);
return person;
}
});
注意 - 我手动设置 Address
对所有人都一样。这基本上是一个嵌套的 RDD。在尝试将其保存为镶木地板文件时:
DataFrame dfschemaPeople = sqlContext.createDataFrame(people, Person.class);
dfschemaPeople.write().parquet("/user/johndoe/spark/data/out/people.parquet");
地址 class 是:
import java.io.Serializable;
public class Address implements Serializable
{
public Address(String city, String block)
{
super();
City = city;
Block = block;
}
private static final long serialVersionUID = 1L;
private String City;
private String Block;
//Omitting getters and setters
}
我遇到错误:
原因:java.lang.ClassCastException:com.test.schema.Address 无法转换为 org.apache.spark.sql.Row
我是 运行 spark-1.4.1。
- 这是已知错误吗?
- 如果我通过导入相同格式的嵌套 JSON 文件来做同样的事情,我就可以保存到 parquet。
- 即使我创建了一个像这样的子 DataFrame:
DataFrame dfSubset = sqlContext.sql("SELECT address.city FROM PersonTable");
我仍然得到同样的错误
那么是什么原因呢?如何从文本文件中读取复杂的数据结构并另存为 parquet?看来我做不到。
您使用的 java api 有限制
来自 spark 文档: http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#interoperating-with-rdds
Spark SQL 支持自动将 JavaBeans 的 RDD 转换为 DataFrame。使用反射获得的 BeanInfo 定义了 table 的模式。目前,Spark SQL 不支持包含嵌套或复杂类型(如列表或数组)的 JavaBeans。您可以通过创建一个 class 来创建一个 JavaBean,该 class 实现了 Serializable 并为其所有字段提供了 getter 和 setter。 使用 scala case classes 它将工作(更新为写入 parquet 格式)
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
case class Address(city:String, block:String);
case class Person(name:String,age:String, address:Address);
object Test2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext.implicits._
val people = sc.parallelize(List(Person("a", "b", Address("a", "b")), Person("c", "d", Address("c", "d"))));
val df = sqlContext.createDataFrame(people);
df.write.mode("overwrite").parquet("/tmp/people.parquet")
}
}