RDD部分的scala操作
scala operations on part of RDD
我是 Scala 的新手,我正在尝试为一个项目做点什么:
我生成了一个RDD:RDD
[UserID1, Date1, Value1]
[UserID1, Date2, Value2]
[UserID1, Date3, Value3]
[UserID2, Date1, Value1]
[UserID3, Date1, Value1]
我希望运行这个RDD上的一个函数生成RDD
[UserID1, FunctionResult1, FunctionResult2]
[UserID1, FunctionResult3, FunctionResult4]
[UserID2, FunctionResult1, FunctionResult2]
此函数的工作方式应该是:
1、groupBy UserID,日期升序排列(我已经把日期格式化成INT:20150225.
2、取第一个日期和第二个日期,求它们之间的天数。
3、取first Value和Second Value,求它们的差值
将这些值记录到函数结果中,继续处理第二个日期和第三个日期以及第二个和第三个值。
如果输入是 5 行 x 3 列,结果应该是 4 行 x 3 列。
到目前为止,我已经尝试在 RDD 上执行 reduceByKey,但它只在输出中生成一行。所以我想知道是否有任何其他有效的方法可以做到这一点,也许没有循环?我当前的代码如下所示
val basicsearchprofile = basicsearch.map(x=>(x._1,(x._2,x._3).reduceByKey((a,b)=> funcdiff(a,b))
// x._1为User ID,x._2为Date,x._3为Value;
def funcdiff(a:(Date,Value),b:(Date,Value)):(Day,value) =
{
val diffdays = (b._1%100 - a._1%100) + ((b._1/100)%100)- (a._1/100)%100))*30 + ((a._1/10000)%100 - (b._1/10000)%100))*365 //difference between days
val diffvalue = Math.abs(a._2 - b._2)
}
(diffdays diffvalue)
}
我假设函数 funcdiff 的返回值成对减少了每个事件,并最终将其减少为单行?是否可以将函数 funcdiff 应用于第一行和第二行,记录答案;接下来将它应用于第二行和第三行......等等,这样返回的结果是 [ID, Datediff, Valuediff]
的 RDD
提前致谢
Spark 并行处理行。由于您必须执行 row2-row1、row3-row2,我认为您不能再并行工作了。所以你必须稍微忘记 Spark,使用普通的 Scala 并在单个节点上处理整个用户数据(尽管每个用户可以并行处理)。例如:
// First, group by user with Spark
case class Info(userId:String, date:Int, value:Int)
val infos=List(
Info("john",20150221,10),
Info("mary",20150221,11),
Info("john",20150222,12),
Info("mary",20150223,15),
Info("john",20150223,14),
Info("john",20150224,16),
Info("john",20150225,18),
Info("mary",20150225,17))
val infoRdd=sc.parallelize(infos)
val infoByIdRdd=infoRdd.map( info => (info.userId, info)).groupByKey()
// Then use plain Scala to process each user data
def infoDeltas(infos:List[Info]) = {
// Transform [Info1, Info2, Info3] into [(Info1,Info2),(Info2,Info3)]
val accZero:(Option[Info],List[(Info,Info)])=(None,List())
def accInfo(last:Option[Info], list:List[(Info,Info)], info:Info) = {
last match {
case None => (Some(info), list)
case Some(lastInfo) => (Some(info), list:+(lastInfo,info))
}
}
val infoIntervals=infos.foldLeft(accZero)(
(acc,info) => accInfo(acc._1, acc._2, info)
)._2
// Transform [(Info1,Info2),(Info2,Info3)] into [Info2-Info1,Info3-Info2]
infoIntervals.map(interval => interval match {case (before,after) => Info(after.userId,after.date-before.date,after.value-before.value)})
}
val infoDeltasByIdRdd=infoByIdRdd.mapValues(infos => infoDeltas(infos.toList))
我是 Scala 的新手,我正在尝试为一个项目做点什么:
我生成了一个RDD:RDD
[UserID1, Date1, Value1]
[UserID1, Date2, Value2]
[UserID1, Date3, Value3]
[UserID2, Date1, Value1]
[UserID3, Date1, Value1]
我希望运行这个RDD上的一个函数生成RDD
[UserID1, FunctionResult1, FunctionResult2]
[UserID1, FunctionResult3, FunctionResult4]
[UserID2, FunctionResult1, FunctionResult2]
此函数的工作方式应该是: 1、groupBy UserID,日期升序排列(我已经把日期格式化成INT:20150225.
2、取第一个日期和第二个日期,求它们之间的天数。
3、取first Value和Second Value,求它们的差值
将这些值记录到函数结果中,继续处理第二个日期和第三个日期以及第二个和第三个值。
如果输入是 5 行 x 3 列,结果应该是 4 行 x 3 列。
到目前为止,我已经尝试在 RDD 上执行 reduceByKey,但它只在输出中生成一行。所以我想知道是否有任何其他有效的方法可以做到这一点,也许没有循环?我当前的代码如下所示
val basicsearchprofile = basicsearch.map(x=>(x._1,(x._2,x._3).reduceByKey((a,b)=> funcdiff(a,b))
// x._1为User ID,x._2为Date,x._3为Value;
def funcdiff(a:(Date,Value),b:(Date,Value)):(Day,value) =
{
val diffdays = (b._1%100 - a._1%100) + ((b._1/100)%100)- (a._1/100)%100))*30 + ((a._1/10000)%100 - (b._1/10000)%100))*365 //difference between days
val diffvalue = Math.abs(a._2 - b._2)
}
(diffdays diffvalue)
}
我假设函数 funcdiff 的返回值成对减少了每个事件,并最终将其减少为单行?是否可以将函数 funcdiff 应用于第一行和第二行,记录答案;接下来将它应用于第二行和第三行......等等,这样返回的结果是 [ID, Datediff, Valuediff]
的 RDD提前致谢
Spark 并行处理行。由于您必须执行 row2-row1、row3-row2,我认为您不能再并行工作了。所以你必须稍微忘记 Spark,使用普通的 Scala 并在单个节点上处理整个用户数据(尽管每个用户可以并行处理)。例如:
// First, group by user with Spark
case class Info(userId:String, date:Int, value:Int)
val infos=List(
Info("john",20150221,10),
Info("mary",20150221,11),
Info("john",20150222,12),
Info("mary",20150223,15),
Info("john",20150223,14),
Info("john",20150224,16),
Info("john",20150225,18),
Info("mary",20150225,17))
val infoRdd=sc.parallelize(infos)
val infoByIdRdd=infoRdd.map( info => (info.userId, info)).groupByKey()
// Then use plain Scala to process each user data
def infoDeltas(infos:List[Info]) = {
// Transform [Info1, Info2, Info3] into [(Info1,Info2),(Info2,Info3)]
val accZero:(Option[Info],List[(Info,Info)])=(None,List())
def accInfo(last:Option[Info], list:List[(Info,Info)], info:Info) = {
last match {
case None => (Some(info), list)
case Some(lastInfo) => (Some(info), list:+(lastInfo,info))
}
}
val infoIntervals=infos.foldLeft(accZero)(
(acc,info) => accInfo(acc._1, acc._2, info)
)._2
// Transform [(Info1,Info2),(Info2,Info3)] into [Info2-Info1,Info3-Info2]
infoIntervals.map(interval => interval match {case (before,after) => Info(after.userId,after.date-before.date,after.value-before.value)})
}
val infoDeltasByIdRdd=infoByIdRdd.mapValues(infos => infoDeltas(infos.toList))