映射键值对,基于它们在 Spark 中的值的相似性
Map key, value pair based on similarity of their value in Spark
我已经学习 Spark 几个星期了,目前我正在尝试使用 Scala 中的 Spark 和 Hadoop 根据他们的联系对几个项目或人员进行分组。例如,我想了解足球运动员如何根据他们的俱乐部历史联系起来。我的 "players" rdd 将是:
(John, FC Sion)
(Mike, FC Sion)
(Bobby, PSV Eindhoven)
(Hans, FC Sion)
我想要这样的 rdd:
(John, <Mike, Hans>)
(Mike, <John, Hans>)
(Bobby, <>)
(Hans, <Mike, John>)
我打算用地图来完成这个。
val splitClubs = players.map(player=> (player._1, parseTeammates(player._2, players)))
其中 parseTeammates 是一个函数,可以找到也为同一俱乐部效力的球员 (player._2)
// RDD is not a type, how can I insert rdd into a function?
def parseTeammates(club: String, rdd: RDD) : List[String] = {
// will generate a list of players that contains same "club" value
val playerList = rdd.filter(_._1 == club)
return playerList.values;
}
我收到编译错误,类型不匹配,因为函数应该是 return List[String] 而是 playerList.values returns org.apache.spark.rdd.RDD[List[String] ].谁能帮我以简单的形式(在我的例子中是 List[String])获得 RDD 的值?
此外,我认为有一种更优雅的方法可以解决这个问题,而不是创建一个单独的 RDD,然后在新的 RDD 中找到某个键,然后 return将该值作为列表
我认为您的 parseTeammates
方法在 RDD 的世界中有点偏离。当谈到处理 RDD 和可能真的非常大量的数据时,你不想做这种嵌套循环。请尝试重新组织您的数据。
下面的代码会得到你想要的
players.map{case(player, club) => (club, List(player))}
.reduceByKey(_++_)
.flatMap{case(_, list) =>list.zipWithIndex.map{case(player, index) => (player, list.take(index) ++ list.drop(index+1))}}
请注意,我首先根据他们效力的俱乐部组织数据,然后合并球员以产生您要查找的格式的结果。
希望对您有所帮助。
对@Glennie 解决方案的不同看法(IMO 对你最初的方法不合适的看法是正确的)。
TL;DR;
players.map { case (player, team) => (team, mutable.HashSet[String](player)) }
.reduceByKey(_++=_)
.flatMap {
case (team, players) => {
for (player <- players)
yield (player, players - player)
}
}
基本思路是相同的(构建一个由团队键入的队友列表,flatMap
这个结果)。但我建议使用其他构建块来获得相同的结果。这是否成功取决于品味和数据集的性能特征。
对 reduceByKey
的不同看法
此处,按键归约涉及将(玩家的)集合与一个或多个玩家连接起来。
如果我们采用原始代码:
players.map{case(player, club) => (club, List(player))}
.reduceByKey(_++_)
在内部,我们最终会调用类似的东西(从 scala 1.4 开始):
def add: (List[String], List[String]) => List[String] = _++_;
players.map { case (player, team) => (team, List(player)) }
.combineByKey(
// The first time we see a new team on each partition
(list: List[String]) => list,
// invoked each time we fusion a player in its team's list
// (e.g. map side combine)
add,
// invoked each time we fusion each team's partial lists
// (e.g. reduce side combine)
add)
这里的要点是 add
操作 (_++_
) 被调用了很多次。所以最好优化一下。
在这方面,我们知道 List
表现不佳,因为每个突变都需要将现有列表整个复制到另一个列表中。请注意:"poorly" 实际上可能无关紧要。如果您有数百万支球队,并且每支球队只有 20 名球员,那么 ++
性能可能与减少中涉及的其他火花计算相形见绌。
(在我的脑海中,还有更糟糕的情况:如果 List
变得非常大,看到它的序列化中涉及的一些操作是递归实现的,我们可能会遇到 Whosebug。我会必须检查一下)。
所以我们可能会从切换到可变集合中受益,就像这样:
players.map { case (player, team) => (team, mutable.ArrayBuffer[String](player)) }
.reduceByKey(_++=_)
- 我们现在有一个可变集合,为此优化了串联
- 我们使用
++=
而不是++
,这样每次融合两个现有集合时,我们甚至不必分配一个全新的集合
- 如果我们知道或数据集很好,我们可以预先调整缓冲区的大小以获得可预测的内存分配,并尽可能避免调整缓冲区大小。或相应地切换实现。
对 flatMap
的不同看法
使用可变集合的好处
最初的实现再次使用了广泛的列表操作,如 take
和 drop
,并结合了带索引的 zip。
可变集合的使用在此处的可读性方面为我们提供了更好的服务,因为我们可以替换 3 个不可变列表副本(take
、drop
、++
):
list.take(index) ++ list.drop(index+1)
只有一个(-
执行克隆)
list - list(index)
替代语法
我们还可以提供一个完全不同的实现,避免使用索引压缩来利用理解:
.flatMap {
case (team, players) => {
for (player <- players)
yield (player, players - player)
}
}
请注意,players - player
步骤涉及在列表中查找播放器。使用 ArrayBuffer
,这是一个 O(n) 操作。因此,如果我们沿着这条路走下去,我们可能会再次根据数据集更喜欢使用 mutable.HashSet
作为可变集合而不是数组缓冲区。
(我本来打算添加 前提是我们的球员名字没有重复 ,但这没关系,因为如果你在一个团队中有两个 "John" ,那么在你的 RDD 中为两个 John 设置两行是没有用的,它没有比一行多的意义。)
我已经学习 Spark 几个星期了,目前我正在尝试使用 Scala 中的 Spark 和 Hadoop 根据他们的联系对几个项目或人员进行分组。例如,我想了解足球运动员如何根据他们的俱乐部历史联系起来。我的 "players" rdd 将是:
(John, FC Sion)
(Mike, FC Sion)
(Bobby, PSV Eindhoven)
(Hans, FC Sion)
我想要这样的 rdd:
(John, <Mike, Hans>)
(Mike, <John, Hans>)
(Bobby, <>)
(Hans, <Mike, John>)
我打算用地图来完成这个。
val splitClubs = players.map(player=> (player._1, parseTeammates(player._2, players)))
其中 parseTeammates 是一个函数,可以找到也为同一俱乐部效力的球员 (player._2)
// RDD is not a type, how can I insert rdd into a function?
def parseTeammates(club: String, rdd: RDD) : List[String] = {
// will generate a list of players that contains same "club" value
val playerList = rdd.filter(_._1 == club)
return playerList.values;
}
我收到编译错误,类型不匹配,因为函数应该是 return List[String] 而是 playerList.values returns org.apache.spark.rdd.RDD[List[String] ].谁能帮我以简单的形式(在我的例子中是 List[String])获得 RDD 的值?
此外,我认为有一种更优雅的方法可以解决这个问题,而不是创建一个单独的 RDD,然后在新的 RDD 中找到某个键,然后 return将该值作为列表
我认为您的 parseTeammates
方法在 RDD 的世界中有点偏离。当谈到处理 RDD 和可能真的非常大量的数据时,你不想做这种嵌套循环。请尝试重新组织您的数据。
下面的代码会得到你想要的
players.map{case(player, club) => (club, List(player))}
.reduceByKey(_++_)
.flatMap{case(_, list) =>list.zipWithIndex.map{case(player, index) => (player, list.take(index) ++ list.drop(index+1))}}
请注意,我首先根据他们效力的俱乐部组织数据,然后合并球员以产生您要查找的格式的结果。
希望对您有所帮助。
对@Glennie 解决方案的不同看法(IMO 对你最初的方法不合适的看法是正确的)。
TL;DR;
players.map { case (player, team) => (team, mutable.HashSet[String](player)) }
.reduceByKey(_++=_)
.flatMap {
case (team, players) => {
for (player <- players)
yield (player, players - player)
}
}
基本思路是相同的(构建一个由团队键入的队友列表,flatMap
这个结果)。但我建议使用其他构建块来获得相同的结果。这是否成功取决于品味和数据集的性能特征。
对 reduceByKey
的不同看法
此处,按键归约涉及将(玩家的)集合与一个或多个玩家连接起来。 如果我们采用原始代码:
players.map{case(player, club) => (club, List(player))}
.reduceByKey(_++_)
在内部,我们最终会调用类似的东西(从 scala 1.4 开始):
def add: (List[String], List[String]) => List[String] = _++_;
players.map { case (player, team) => (team, List(player)) }
.combineByKey(
// The first time we see a new team on each partition
(list: List[String]) => list,
// invoked each time we fusion a player in its team's list
// (e.g. map side combine)
add,
// invoked each time we fusion each team's partial lists
// (e.g. reduce side combine)
add)
这里的要点是 add
操作 (_++_
) 被调用了很多次。所以最好优化一下。
在这方面,我们知道 List
表现不佳,因为每个突变都需要将现有列表整个复制到另一个列表中。请注意:"poorly" 实际上可能无关紧要。如果您有数百万支球队,并且每支球队只有 20 名球员,那么 ++
性能可能与减少中涉及的其他火花计算相形见绌。
(在我的脑海中,还有更糟糕的情况:如果 List
变得非常大,看到它的序列化中涉及的一些操作是递归实现的,我们可能会遇到 Whosebug。我会必须检查一下)。
所以我们可能会从切换到可变集合中受益,就像这样:
players.map { case (player, team) => (team, mutable.ArrayBuffer[String](player)) }
.reduceByKey(_++=_)
- 我们现在有一个可变集合,为此优化了串联
- 我们使用
++=
而不是++
,这样每次融合两个现有集合时,我们甚至不必分配一个全新的集合 - 如果我们知道或数据集很好,我们可以预先调整缓冲区的大小以获得可预测的内存分配,并尽可能避免调整缓冲区大小。或相应地切换实现。
对 flatMap
的不同看法
使用可变集合的好处
最初的实现再次使用了广泛的列表操作,如 take
和 drop
,并结合了带索引的 zip。
可变集合的使用在此处的可读性方面为我们提供了更好的服务,因为我们可以替换 3 个不可变列表副本(take
、drop
、++
):
list.take(index) ++ list.drop(index+1)
只有一个(-
执行克隆)
list - list(index)
替代语法
我们还可以提供一个完全不同的实现,避免使用索引压缩来利用理解:
.flatMap {
case (team, players) => {
for (player <- players)
yield (player, players - player)
}
}
请注意,players - player
步骤涉及在列表中查找播放器。使用 ArrayBuffer
,这是一个 O(n) 操作。因此,如果我们沿着这条路走下去,我们可能会再次根据数据集更喜欢使用 mutable.HashSet
作为可变集合而不是数组缓冲区。
(我本来打算添加 前提是我们的球员名字没有重复 ,但这没关系,因为如果你在一个团队中有两个 "John" ,那么在你的 RDD 中为两个 John 设置两行是没有用的,它没有比一行多的意义。)