如何强制spark在本地执行reduction
How to force spark to perform reduction locally
我正在寻找一种技巧来强制 Spark 在工作内核执行的所有任务之间在本地执行归约操作,然后再为所有任务执行归约操作。
事实上,我的驱动程序节点和网络带宽似乎因为大任务结果 (=400MB) 而过载。
val arg0 = sc.broadcast(fs.read(0, 4))
val arg1 = sc.broadcast(fs.read(1, 4))
val arg2 = fs.read(5, 4)
val index = info.sc.parallelize(0.toLong to 10000-1 by 1)
val mapres = index.map{ x => function(arg0.value, arg1.value, x, arg2) }
val output = mapres.reduce(Util.bitor)
驱动程序按处理器内核分配 1 个分区,因此按工作程序分配 8 个分区。
因为reduce
applies reduction locally for each partition. Only the final merge is applied on the driver没有什么好强求的。更不用说 400MB 在任何合理的配置中都不应该成为问题。
如果你想对 worker 执行更多工作,你可以使用 treeReduce
尽管 8 个分区几乎没有任何好处。
我正在寻找一种技巧来强制 Spark 在工作内核执行的所有任务之间在本地执行归约操作,然后再为所有任务执行归约操作。 事实上,我的驱动程序节点和网络带宽似乎因为大任务结果 (=400MB) 而过载。
val arg0 = sc.broadcast(fs.read(0, 4))
val arg1 = sc.broadcast(fs.read(1, 4))
val arg2 = fs.read(5, 4)
val index = info.sc.parallelize(0.toLong to 10000-1 by 1)
val mapres = index.map{ x => function(arg0.value, arg1.value, x, arg2) }
val output = mapres.reduce(Util.bitor)
驱动程序按处理器内核分配 1 个分区,因此按工作程序分配 8 个分区。
因为reduce
applies reduction locally for each partition. Only the final merge is applied on the driver没有什么好强求的。更不用说 400MB 在任何合理的配置中都不应该成为问题。
如果你想对 worker 执行更多工作,你可以使用 treeReduce
尽管 8 个分区几乎没有任何好处。