Apache Spark flatMap 时间复杂度
Apache Spark flatMap time complexity
我一直在尝试寻找一种方法来计算字符串集在事务数据库中出现的次数(以分布式方式实现 Apriori 算法)。我目前的代码如下:
val cand_br = sc.broadcast(cand)
transactions.flatMap(trans => freq(trans, cand_br.value))
.reduceByKey(_ + _)
}
def freq(trans: Set[String], cand: Array[Set[String]]) : Array[(Set[String],Int)] = {
var res = ArrayBuffer[(Set[String],Int)]()
for (c <- cand) {
if (c.subsetOf(trans)) {
res += ((c,1))
}
}
return res.toArray
}
transactions 以 RDD[Set[String]]
开始,我正在尝试将其转换为 RDD[(K, V)
,cand
中的每个元素都为 K,每个元素出现的次数为 V交易列表中 cand
中的元素。
在 UI 上观看性能时,flatMap 阶段很快需要大约 3 分钟才能完成,而其余部分不到 1 毫秒。
transactions.count() ~= 88000
和 cand.length ~= 24000
了解我正在处理的数据。我尝试了不同的方法来保存数据,但我很确定这是我面临的算法问题。
是否有更优解来解决这个子问题?
PS: 我是 Scala / Spark 框架的新手,所以这段代码中可能有一些奇怪的结构
在这种情况下,可能要问的正确问题是:"what is the time complexity of this algorithm"。我认为这与Spark的flatMap操作无关。
粗略的 O 复杂度分析
给定 2 个大小为 m
和 n
的集合集合,该算法正在计算一个集合中有多少元素是另一个集合元素的子集,因此它看起来很复杂 m x n
。看得更深一层,我们还看到 'subsetOf' 与子集的元素数量呈线性关系。 x subSet y
== x forAll y
,所以实际上复杂度是 m x n x s
,其中 s
是被检查子集的基数。
也就是说,这个flatMap
操作的工作量很大。
并行
现在,回到 Spark,我们还可以观察到这个算法是令人尴尬的并行,我们可以利用 Spark 的功能来发挥我们的优势。
为了比较一些方法,我在 val cand = transactions.filter(_.size<4).collect
上加载了 'retail' 数据集 [1] 和 运行 算法。数据大小是问题的近邻:
- Transactions.count = 88162
- cand.size = 15451
本地模式下的一些比较运行:
- 瓦尼拉:1.2 分钟
- 将
transactions
分区增加到核心数 (8):33 秒
我还尝试了另一种实现方式,使用 cartesian
而不是 flatmap
:
transactions.cartesian(candRDD).map{case (tx,cd) => (cd,if (cd.subsetOf(tx)) 1 else 0)}.reduceByKey(_ + _).collect
但这导致运行时间更长,如 Spark UI 的前两行所示(笛卡尔和具有更多分区的笛卡尔):2.5 分钟
考虑到我只有 8 个可用的逻辑核心,超出这个数量也无济于事。
完整性检查:
有没有添加'Spark flatMap time complexity'?可能有一些,因为它涉及序列化闭包和拆包集合,但与正在执行的函数相比可以忽略不计。
让我们看看是否可以做得更好:我使用普通 scala 实现了相同的算法:
val resLocal = reduceByKey(transLocal.flatMap(t运行s => freq(t运行s, cand)))
其中 reduceByKey
操作是从 [2] 中获取的天真的实现
执行时间:3.67 秒。
Sparks 为您提供开箱即用的并行性。这个 impl 是完全顺序的,因此需要更长的时间才能完成。
上次完整性检查:一个简单的平面图操作:
transactions.flatMap(trans => Seq((trans, 1))).reduceByKey( _ + _).collect
执行时间:0.88秒
结论:
Spark 正在为您购买并行性和集群,此算法可以利用它。使用更多内核并相应地对输入数据进行分区。
flatmap
没问题。时间复杂度奖励归于其中的函数。
我一直在尝试寻找一种方法来计算字符串集在事务数据库中出现的次数(以分布式方式实现 Apriori 算法)。我目前的代码如下:
val cand_br = sc.broadcast(cand)
transactions.flatMap(trans => freq(trans, cand_br.value))
.reduceByKey(_ + _)
}
def freq(trans: Set[String], cand: Array[Set[String]]) : Array[(Set[String],Int)] = {
var res = ArrayBuffer[(Set[String],Int)]()
for (c <- cand) {
if (c.subsetOf(trans)) {
res += ((c,1))
}
}
return res.toArray
}
transactions 以 RDD[Set[String]]
开始,我正在尝试将其转换为 RDD[(K, V)
,cand
中的每个元素都为 K,每个元素出现的次数为 V交易列表中 cand
中的元素。
在 UI 上观看性能时,flatMap 阶段很快需要大约 3 分钟才能完成,而其余部分不到 1 毫秒。
transactions.count() ~= 88000
和 cand.length ~= 24000
了解我正在处理的数据。我尝试了不同的方法来保存数据,但我很确定这是我面临的算法问题。
是否有更优解来解决这个子问题?
PS: 我是 Scala / Spark 框架的新手,所以这段代码中可能有一些奇怪的结构
在这种情况下,可能要问的正确问题是:"what is the time complexity of this algorithm"。我认为这与Spark的flatMap操作无关。
粗略的 O 复杂度分析
给定 2 个大小为 m
和 n
的集合集合,该算法正在计算一个集合中有多少元素是另一个集合元素的子集,因此它看起来很复杂 m x n
。看得更深一层,我们还看到 'subsetOf' 与子集的元素数量呈线性关系。 x subSet y
== x forAll y
,所以实际上复杂度是 m x n x s
,其中 s
是被检查子集的基数。
也就是说,这个flatMap
操作的工作量很大。
并行
现在,回到 Spark,我们还可以观察到这个算法是令人尴尬的并行,我们可以利用 Spark 的功能来发挥我们的优势。
为了比较一些方法,我在 val cand = transactions.filter(_.size<4).collect
上加载了 'retail' 数据集 [1] 和 运行 算法。数据大小是问题的近邻:
- Transactions.count = 88162
- cand.size = 15451
本地模式下的一些比较运行:
- 瓦尼拉:1.2 分钟
- 将
transactions
分区增加到核心数 (8):33 秒
我还尝试了另一种实现方式,使用 cartesian
而不是 flatmap
:
transactions.cartesian(candRDD).map{case (tx,cd) => (cd,if (cd.subsetOf(tx)) 1 else 0)}.reduceByKey(_ + _).collect
但这导致运行时间更长,如 Spark UI 的前两行所示(笛卡尔和具有更多分区的笛卡尔):2.5 分钟
考虑到我只有 8 个可用的逻辑核心,超出这个数量也无济于事。
完整性检查:
有没有添加'Spark flatMap time complexity'?可能有一些,因为它涉及序列化闭包和拆包集合,但与正在执行的函数相比可以忽略不计。
让我们看看是否可以做得更好:我使用普通 scala 实现了相同的算法:
val resLocal = reduceByKey(transLocal.flatMap(t运行s => freq(t运行s, cand)))
其中 reduceByKey
操作是从 [2] 中获取的天真的实现
执行时间:3.67 秒。
Sparks 为您提供开箱即用的并行性。这个 impl 是完全顺序的,因此需要更长的时间才能完成。
上次完整性检查:一个简单的平面图操作:
transactions.flatMap(trans => Seq((trans, 1))).reduceByKey( _ + _).collect
执行时间:0.88秒
结论:
Spark 正在为您购买并行性和集群,此算法可以利用它。使用更多内核并相应地对输入数据进行分区。
flatmap
没问题。时间复杂度奖励归于其中的函数。