Apache Spark 数据集 API - 不接受架构 StructType
Apache Spark Dataset API - Does not accept schema StructType
我有以下 class 使用 Spark 数据加载无头 CSV 文件 API。
我遇到的问题是我无法让 SparkSession 接受应该定义每一列的架构 StructType。结果数据框是字符串类型的未命名列
public class CsvReader implements java.io.Serializable {
public CsvReader(StructType builder) {
this.builder = builder;
}
private StructType builder;
SparkConf conf = new SparkConf().setAppName("csvParquet").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", false)
//.option("inferSchema", true)
.schema(builder)
.load("/Users/Chris/Desktop/Meter_Geocode_Data.csv"); //TODO: CMD line arg
public void printSchema() {
System.out.println(builder.length());
df.printSchema();
}
public void printData() {
df.show();
}
public void printMeters() {
df.select("meter").show();
}
public void printMeterCountByGeocode_result() {
df.groupBy("geocode_result").count().show();
}
public Dataset getDataframe() {
return df;
}
}
生成的数据框架构是:
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
|-- _c5: string (nullable = true)
|-- _c6: string (nullable = true)
|-- _c7: string (nullable = true)
|-- _c8: string (nullable = true)
|-- _c9: string (nullable = true)
|-- _c10: string (nullable = true)
|-- _c11: string (nullable = true)
|-- _c12: string (nullable = true)
|-- _c13: string (nullable = true)
调试器显示 'builder' StrucType 已正确定义:
0 = {StructField@4904} "StructField(geocode_result,DoubleType,false)"
1 = {StructField@4905} "StructField(meter,StringType,false)"
2 = {StructField@4906} "StructField(orig_easting,StringType,false)"
3 = {StructField@4907} "StructField(orig_northing,StringType,false)"
4 = {StructField@4908} "StructField(temetra_easting,StringType,false)"
5 = {StructField@4909} "StructField(temetra_northing,StringType,false)"
6 = {StructField@4910} "StructField(orig_address,StringType,false)"
7 = {StructField@4911} "StructField(orig_postcode,StringType,false)"
8 = {StructField@4912} "StructField(postcode_easting,StringType,false)"
9 = {StructField@4913} "StructField(postcode_northing,StringType,false)"
10 = {StructField@4914} "StructField(distance_calc_method,StringType,false)"
11 = {StructField@4915} "StructField(distance,StringType,false)"
12 = {StructField@4916} "StructField(geocoded_address,StringType,false)"
13 = {StructField@4917} "StructField(geocoded_postcode,StringType,false)"
我做错了什么?非常感谢任何帮助!
如果你想通过builder.Or初始化它,你应该把你的df放在构造函数中你可以把它放在一个成员函数中。
定义变量 Dataset<Row> df
并将用于读取 CSV 文件的代码块移动到 getDataframe()
方法中,如下所示。
private Dataset<Row> df = null;
public Dataset getDataframe() {
df = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", false)
//.option("inferSchema", true)
.schema(builder)
.load("src/main/java/resources/test.csv"); //TODO: CMD line arg
return df;
}
现在可以像下面这样调用了。
CsvReader cr = new CsvReader(schema);
Dataset df = cr.getDataframe();
cr.printSchema();
我建议您重新设计 class。一种选择是您可以将 df 作为参数传递给其他方法。如果您使用的是 Spark 2.0,则不需要 SparkConf。请参考documentation创建SparkSession。
我有以下 class 使用 Spark 数据加载无头 CSV 文件 API。
我遇到的问题是我无法让 SparkSession 接受应该定义每一列的架构 StructType。结果数据框是字符串类型的未命名列
public class CsvReader implements java.io.Serializable {
public CsvReader(StructType builder) {
this.builder = builder;
}
private StructType builder;
SparkConf conf = new SparkConf().setAppName("csvParquet").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", false)
//.option("inferSchema", true)
.schema(builder)
.load("/Users/Chris/Desktop/Meter_Geocode_Data.csv"); //TODO: CMD line arg
public void printSchema() {
System.out.println(builder.length());
df.printSchema();
}
public void printData() {
df.show();
}
public void printMeters() {
df.select("meter").show();
}
public void printMeterCountByGeocode_result() {
df.groupBy("geocode_result").count().show();
}
public Dataset getDataframe() {
return df;
}
}
生成的数据框架构是:
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
|-- _c5: string (nullable = true)
|-- _c6: string (nullable = true)
|-- _c7: string (nullable = true)
|-- _c8: string (nullable = true)
|-- _c9: string (nullable = true)
|-- _c10: string (nullable = true)
|-- _c11: string (nullable = true)
|-- _c12: string (nullable = true)
|-- _c13: string (nullable = true)
调试器显示 'builder' StrucType 已正确定义:
0 = {StructField@4904} "StructField(geocode_result,DoubleType,false)"
1 = {StructField@4905} "StructField(meter,StringType,false)"
2 = {StructField@4906} "StructField(orig_easting,StringType,false)"
3 = {StructField@4907} "StructField(orig_northing,StringType,false)"
4 = {StructField@4908} "StructField(temetra_easting,StringType,false)"
5 = {StructField@4909} "StructField(temetra_northing,StringType,false)"
6 = {StructField@4910} "StructField(orig_address,StringType,false)"
7 = {StructField@4911} "StructField(orig_postcode,StringType,false)"
8 = {StructField@4912} "StructField(postcode_easting,StringType,false)"
9 = {StructField@4913} "StructField(postcode_northing,StringType,false)"
10 = {StructField@4914} "StructField(distance_calc_method,StringType,false)"
11 = {StructField@4915} "StructField(distance,StringType,false)"
12 = {StructField@4916} "StructField(geocoded_address,StringType,false)"
13 = {StructField@4917} "StructField(geocoded_postcode,StringType,false)"
我做错了什么?非常感谢任何帮助!
如果你想通过builder.Or初始化它,你应该把你的df放在构造函数中你可以把它放在一个成员函数中。
定义变量 Dataset<Row> df
并将用于读取 CSV 文件的代码块移动到 getDataframe()
方法中,如下所示。
private Dataset<Row> df = null;
public Dataset getDataframe() {
df = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", false)
//.option("inferSchema", true)
.schema(builder)
.load("src/main/java/resources/test.csv"); //TODO: CMD line arg
return df;
}
现在可以像下面这样调用了。
CsvReader cr = new CsvReader(schema);
Dataset df = cr.getDataframe();
cr.printSchema();
我建议您重新设计 class。一种选择是您可以将 df 作为参数传递给其他方法。如果您使用的是 Spark 2.0,则不需要 SparkConf。请参考documentation创建SparkSession。