Spark - 可以将 MultiMap 转换为 JAVA 中的 DataFrame
Spark - Can a MultiMap be converted to a DataFrame in JAVA
我正在尝试将数十亿数据值的 MultiMap 转换为 Spark DataFrame 以进行 运行 计算,然后将结果写入 cassandra table。
我从以下 cassandra 查询和循环生成多图。如果有更好的方法将这些数据获取并操作到 DataFrame 中,我很乐意接受建议,就像我在循环中一样。
代码更新为答案:
//Build ResultSet from cassandra query for data manipulation.
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
//Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
stmt.setFetchSize(1000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
示例多重映射具有 {Double}=[Float] 类型,其中每个 Double 可能有多个 Float 项目
示例
{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11]
我需要使用 spark 来获取其中每一项的最小值、最大值和平均值。例如对于第一个 1.50ED 将是最小 10,最大 20,平均 15。
我已经有了可以使用的代码,一旦我可以在临时table中获取它并作为数据帧运行:
queryMV.groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
如果能提供一些有关如何使用 JAVA 将多图转换为 DataFrame 的提示,我将不胜感激。我一直没能找到任何关于在 spark 中使用 multimaps 的文档。
我目前使用的解决方案执行初始查询,并使用 for 循环将原始数据写入新的 table,然后我可以直接映射到 temptable / dataframe 但是这需要很多时间,因为我必须在计算之前向 cassandra 写入数十亿行。我想用multimap之类的直接转换成spark来计算。
case class Output(a : Double ,b : Int )
val input = Map(1.50E8-> List(10, 20) , 1.51E8-> List( -10, -13, -14, -15 ), 1.52E8-> List(-10, -11)).toArray
val inputRdd = sc.parallelize(input)
val queryMV = inputRdd.flatMap(x=> x._2.map(y=> Output(x._1, y))).toDF
唉,Java parallelize
方法要么接受 T
的列表,要么接受 parallelizePairs
的 Tuple<K, V>
列表。所以你需要转换。虽然 createDataFrame
仅适用于 RDD 和 Scala Seq
并且需要一个模式(bean 或 StructType)。
让它更有趣 com.google.common.collect.ImmutableEntry
不可序列化,因此您需要在 Java 中进行转换,因此 @Pankaj Arora 解决方案的 Java 版本将无法工作除非您将转换逻辑移动到 Java。 IE。
public class Value implements Serializable {
public Value(Double a, Float b) {
this.a = a;
this.b = b;
}
Double a;
Float b;
public void setA(Double a) {
this.a = a;
}
public void setB(Float b) {
this.b = b;
}
public Double getA() {
return a;
}
public Float getB() {
return b;
}
public String toString() {
return "[" +a +","+b+"]";
}
}
Multimap<Double, Float> data = LinkedListMultimap.create();
data.put(1d, 1f);
data.put(1d, 2f);
data.put(2d, 3f);
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
sqlContext.createDataFrame(sc.parallelize(values), Value.class).show();
考虑到您的编辑,我会考虑从头开始创建对象(而不是多地图)。
我正在尝试将数十亿数据值的 MultiMap 转换为 Spark DataFrame 以进行 运行 计算,然后将结果写入 cassandra table。
我从以下 cassandra 查询和循环生成多图。如果有更好的方法将这些数据获取并操作到 DataFrame 中,我很乐意接受建议,就像我在循环中一样。
代码更新为答案:
//Build ResultSet from cassandra query for data manipulation.
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
//Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
stmt.setFetchSize(1000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
示例多重映射具有 {Double}=[Float] 类型,其中每个 Double 可能有多个 Float 项目
示例
{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11]
我需要使用 spark 来获取其中每一项的最小值、最大值和平均值。例如对于第一个 1.50ED 将是最小 10,最大 20,平均 15。
我已经有了可以使用的代码,一旦我可以在临时table中获取它并作为数据帧运行:
queryMV.groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
如果能提供一些有关如何使用 JAVA 将多图转换为 DataFrame 的提示,我将不胜感激。我一直没能找到任何关于在 spark 中使用 multimaps 的文档。
我目前使用的解决方案执行初始查询,并使用 for 循环将原始数据写入新的 table,然后我可以直接映射到 temptable / dataframe 但是这需要很多时间,因为我必须在计算之前向 cassandra 写入数十亿行。我想用multimap之类的直接转换成spark来计算。
case class Output(a : Double ,b : Int )
val input = Map(1.50E8-> List(10, 20) , 1.51E8-> List( -10, -13, -14, -15 ), 1.52E8-> List(-10, -11)).toArray
val inputRdd = sc.parallelize(input)
val queryMV = inputRdd.flatMap(x=> x._2.map(y=> Output(x._1, y))).toDF
唉,Java parallelize
方法要么接受 T
的列表,要么接受 parallelizePairs
的 Tuple<K, V>
列表。所以你需要转换。虽然 createDataFrame
仅适用于 RDD 和 Scala Seq
并且需要一个模式(bean 或 StructType)。
让它更有趣 com.google.common.collect.ImmutableEntry
不可序列化,因此您需要在 Java 中进行转换,因此 @Pankaj Arora 解决方案的 Java 版本将无法工作除非您将转换逻辑移动到 Java。 IE。
public class Value implements Serializable {
public Value(Double a, Float b) {
this.a = a;
this.b = b;
}
Double a;
Float b;
public void setA(Double a) {
this.a = a;
}
public void setB(Float b) {
this.b = b;
}
public Double getA() {
return a;
}
public Float getB() {
return b;
}
public String toString() {
return "[" +a +","+b+"]";
}
}
Multimap<Double, Float> data = LinkedListMultimap.create();
data.put(1d, 1f);
data.put(1d, 2f);
data.put(2d, 3f);
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
sqlContext.createDataFrame(sc.parallelize(values), Value.class).show();
考虑到您的编辑,我会考虑从头开始创建对象(而不是多地图)。