Spark - 从 RDD 或 DataFrame 将列映射到 JAVA 中的变量
Spark - Mapping columns to variables in JAVA from an RDD or DataFrame
我正在尝试将 Spark RDD 中的 cassandra 行列映射到我可以交互以在 spark 中进行操作的变量,但似乎无法将它们放入变量中。我有以下代码:
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("model", "reports", mapRowTo (MeasuredValue.class))
.select("start_frequency","bandwidth", "power");
JavaRDD<Value> valueRdd = rdd.flatMap(row-> {
double start_frequency = row.getStartFrequency();
float power = row.getPower();
double bandwidth = row.getBandwidth();
List<Value> list = new ArrayList<Value>();
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
})
我的类是这样的:
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
private double start_frequency;
public double getStart_frequency() { return start_frequency; }
public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }
private double bandwidth ;
public double getBandwidth() { return bandwidth; }
public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }
private float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}
我试图用 lambda 对行进行平面映射的尝试似乎是错误的,因为我收到以下错误:
method flatMap in class AbstractJavaRDDLike cannot be applied
to given types; required: FlatMapFunction found:
(row)->{d[...];}} reason: cannot infer type-variable(s) U (argument
mismatch; bad return type in lambda expression missing return value)
我在关于
的 "Create Channel Power Bucket" 循环中遇到错误
"local variable referenced from a lambda expression must be final
or effectively final"
如果我可以使用 DataFrame 来做到这一点,我将有兴趣查看代码来促进这一点。
答案被发现为:
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
double start_frequency = row.getStart_frequency();
float power = row.getPower();
double bandwidth = row.getBandwidth();
// Define Variable
double channel,channel_end, increment;
// Initialize Variables
channel_end = 1.6159E8;
increment = 5000;
List<Value> list = new ArrayList<Value>();
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
return list;
}
});
sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
private double start_frequency;
public double getStart_frequency() { return start_frequency; }
public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }
private double bandwidth ;
public double getBandwidth() { return bandwidth; }
public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }
private float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}
我正在尝试将 Spark RDD 中的 cassandra 行列映射到我可以交互以在 spark 中进行操作的变量,但似乎无法将它们放入变量中。我有以下代码:
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("model", "reports", mapRowTo (MeasuredValue.class))
.select("start_frequency","bandwidth", "power");
JavaRDD<Value> valueRdd = rdd.flatMap(row-> {
double start_frequency = row.getStartFrequency();
float power = row.getPower();
double bandwidth = row.getBandwidth();
List<Value> list = new ArrayList<Value>();
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
})
我的类是这样的:
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
private double start_frequency;
public double getStart_frequency() { return start_frequency; }
public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }
private double bandwidth ;
public double getBandwidth() { return bandwidth; }
public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }
private float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}
我试图用 lambda 对行进行平面映射的尝试似乎是错误的,因为我收到以下错误:
method flatMap in class AbstractJavaRDDLike cannot be applied to given types; required: FlatMapFunction found: (row)->{d[...];}} reason: cannot infer type-variable(s) U (argument mismatch; bad return type in lambda expression missing return value)
我在关于
的 "Create Channel Power Bucket" 循环中遇到错误"local variable referenced from a lambda expression must be final or effectively final"
如果我可以使用 DataFrame 来做到这一点,我将有兴趣查看代码来促进这一点。
答案被发现为:
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
double start_frequency = row.getStart_frequency();
float power = row.getPower();
double bandwidth = row.getBandwidth();
// Define Variable
double channel,channel_end, increment;
// Initialize Variables
channel_end = 1.6159E8;
increment = 5000;
List<Value> list = new ArrayList<Value>();
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
return list;
}
});
sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
private double start_frequency;
public double getStart_frequency() { return start_frequency; }
public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }
private double bandwidth ;
public double getBandwidth() { return bandwidth; }
public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }
private float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}