从Spark中的Cassandra的第一行和最后一行中减去值
Subtracting values from first and last row from Cassandra in Spark
我有这段代码,它从 Cassandra 获取 RDD,然后提取每个键的第一行和最后一行并将它们相减。
val rdd = sc.cassandraTable("keyspace","table")
.select("column1", "column2", "column3", "column4","column5")
.as((i:String, p:String, e:String, c:Double, a:java.util.Date) => ((i), (c, a, p, e)))
.groupByKey.mapValues(v => v.toList)
.cache
val finalValues = rdd.mapValues(v => v.head)
val initialValues = rdd.mapValues(v => v.last)
val valuesCombined = finalValues.join(initialValues)
val results = valuesCombined.map(v => (v._2._1._1 - v._2._2._1))
它的性能好还是有更好的解决方案?我不确定是否将整个数据集缓存在内存中。
首先 - 我假设 all
变量指的是名为 rdd
的变量?创建后,不需要使用join(性能开销很大),直接将每个元素映射到你需要的结果即可:
val results = all.mapValues(v => v.head - v.last).values
现在 - 因为我们只对 RDD 执行了一个动作,我们也可以去掉 cache()
.
groupByKey
打乱数据,不再保证分组值的顺序。它也相当昂贵。
如果您真的想在 RDDs
而不是 DataFrames
上操作并且排序是基于您可以使用 aggregateByKey
:
的日期
import scala.math.Ordering
type Record = (String, String, String, Double, java.util.Date)
val RecordOrd = Ordering.by[Record, java.util.Date](_._5)
val minRecord = ("", "", "", 0.0, new java.util.Date(Long.MinValue))
val maxRecord = ("", "", "", 0.0, new java.util.Date(Long.MaxValue))
def minMax(x: (Record, Record), y: (Record, Record)) = {
(RecordOrd.min(x._1, y._1), RecordOrd.max(x._2, y._2))
}
rdd.aggregateByKey((maxRecord, minRecord))(
(acc, x) => minMax(acc, (x, x)),
minMax
)
使用 DataFrames
你可以尝试这样的事情:
import org.apache.spark.sql.functions.{col, lag, lead, when, row_number, max}
import org.apache.spark.sql.expressions.Window
val partition = Seq("column1")
val order = Seq("column5")
val columns = Seq("column2", "column3", "column4","column5")
val w = Window
.partitionBy(partition.head, partition.tail: _*)
.orderBy(order.head, order.tail: _*)
// Lead / lag of row number to mark first / last row in the group
val rn_lag = lag(row_number.over(w), 1).over(w)
val rn_lead = lead(row_number.over(w), 1).over(w)
// Select value if first / last row in the group otherwise null
val firstColumns = columns.map(
c => when(rn_lag.isNull, col(c)).alias(s"${c}_first"))
val lastColumns = columns.map(
c => when(rn_lead.isNull, col(c)).alias(s"${c}_last"))
// Add columns with first / last vals
val expanded = df.select(
partition.map(col(_)) ++ firstColumns ++ lastColumns: _*)
// Aggregate to drop nulls
val aggExprs = expanded.columns.diff(partition).map(c => max(c).alias(c))
expanded.groupBy(partition.map(col(_)): _*).agg(aggExprs.head, aggExprs.tail: _*)
还有一些其他方法可以解决 DataFrames
的这个问题,包括订购 structs
和 DataSet
API。请参阅我对
的回答
我有这段代码,它从 Cassandra 获取 RDD,然后提取每个键的第一行和最后一行并将它们相减。
val rdd = sc.cassandraTable("keyspace","table")
.select("column1", "column2", "column3", "column4","column5")
.as((i:String, p:String, e:String, c:Double, a:java.util.Date) => ((i), (c, a, p, e)))
.groupByKey.mapValues(v => v.toList)
.cache
val finalValues = rdd.mapValues(v => v.head)
val initialValues = rdd.mapValues(v => v.last)
val valuesCombined = finalValues.join(initialValues)
val results = valuesCombined.map(v => (v._2._1._1 - v._2._2._1))
它的性能好还是有更好的解决方案?我不确定是否将整个数据集缓存在内存中。
首先 - 我假设 all
变量指的是名为 rdd
的变量?创建后,不需要使用join(性能开销很大),直接将每个元素映射到你需要的结果即可:
val results = all.mapValues(v => v.head - v.last).values
现在 - 因为我们只对 RDD 执行了一个动作,我们也可以去掉 cache()
.
groupByKey
打乱数据,不再保证分组值的顺序。它也相当昂贵。
如果您真的想在 RDDs
而不是 DataFrames
上操作并且排序是基于您可以使用 aggregateByKey
:
import scala.math.Ordering
type Record = (String, String, String, Double, java.util.Date)
val RecordOrd = Ordering.by[Record, java.util.Date](_._5)
val minRecord = ("", "", "", 0.0, new java.util.Date(Long.MinValue))
val maxRecord = ("", "", "", 0.0, new java.util.Date(Long.MaxValue))
def minMax(x: (Record, Record), y: (Record, Record)) = {
(RecordOrd.min(x._1, y._1), RecordOrd.max(x._2, y._2))
}
rdd.aggregateByKey((maxRecord, minRecord))(
(acc, x) => minMax(acc, (x, x)),
minMax
)
使用 DataFrames
你可以尝试这样的事情:
import org.apache.spark.sql.functions.{col, lag, lead, when, row_number, max}
import org.apache.spark.sql.expressions.Window
val partition = Seq("column1")
val order = Seq("column5")
val columns = Seq("column2", "column3", "column4","column5")
val w = Window
.partitionBy(partition.head, partition.tail: _*)
.orderBy(order.head, order.tail: _*)
// Lead / lag of row number to mark first / last row in the group
val rn_lag = lag(row_number.over(w), 1).over(w)
val rn_lead = lead(row_number.over(w), 1).over(w)
// Select value if first / last row in the group otherwise null
val firstColumns = columns.map(
c => when(rn_lag.isNull, col(c)).alias(s"${c}_first"))
val lastColumns = columns.map(
c => when(rn_lead.isNull, col(c)).alias(s"${c}_last"))
// Add columns with first / last vals
val expanded = df.select(
partition.map(col(_)) ++ firstColumns ++ lastColumns: _*)
// Aggregate to drop nulls
val aggExprs = expanded.columns.diff(partition).map(c => max(c).alias(c))
expanded.groupBy(partition.map(col(_)): _*).agg(aggExprs.head, aggExprs.tail: _*)
还有一些其他方法可以解决 DataFrames
的这个问题,包括订购 structs
和 DataSet
API。请参阅我对