Mahout spark-itemsimilarity saveAsTextFile 最后阶段非常慢

Mahout spark-itemsimilarity saveAsTextFile final stage is very slow

我在 HDP 上以 YARN 客户端模式在 Spark 1.5.1 上使用 Mahout 0.11.0 2.2 来自 cli 的集群。我的输入大约是 325Mb,分成 1000 个部分文件。这是我调用的确切命令:

$MAHOUT_HOME/bin/mahout spark-itemsimilarity --input unit-similarity-dump/bpc1 --output mahout-cooccurrence-output4 --maxPrefs 200 --maxSimilaritiesPerItem 100 --master yarn-client --sparkExecutorMem 10g -D:spark.yarn.executor.memoryOverhead=1024 -D:spark.executor.cores=5 -D:spark.executor.instances=50 -D:spark.yarn.am.memory=4g -D:spark.yarn.am.memoryOverhead=512 -D:spark.yarn.am.cores=2 -D:spark.driver.memory=20g -D:spark.yarn.driver.memoryOverhead=2048 -D:spark.driver.cores=4 -D:spark.driver.maxResultSize=10g -D:spark.yarn.queue=product -D:hdp.version=2.2.6.0-2800

应用程序运行良好,直到最后阶段 saveAsTextFile 被调用。此时,任务逐渐停止,每个任务都需要 45 分钟到一个小时才能成功。仔细检查后,似乎每个任务都在读取 MapPartitionsRDD 的所有 1000 个分区,直觉上我认为这一定是性能问题的根源。这些分区在所有执行程序中均匀分布,因此我认为每个任务都需要从不是其直接父级的 n-1 个执行程序请求所有分区。

优化此应用程序的最佳方法是什么?更少的分区,所以要请求的远程数据更少?更少的执行者,所以每个任务的本地化数据百分比更高?尝试为 RDD 强制设置更高的复制因子?现在它似乎默认为 存储级别:内存反序列化 1x 复制,100% 缓存。

为清楚起见,这里是舞台详细信息的屏幕截图:saveAsTextFile stage

提前感谢您的任何见解。

更新:

我尝试只使用 1 个多核执行器(即任务),尽管所有 RDD 分区都存在于单个本地执行器上,但性能仍然很慢。我认为唯一的罪魁祸首是最终 saveAsTextFile DAG 中 reduceByKey 造成的洗牌。

第二次更新:

我也尝试过只使用 1 个输入分区,而我以前使用过 100 个甚至 1000 个。结果非常相似,总结如下 here。为清楚起见,在此 运行 中,我使用了一个 20g 的 5 核执行器。然而,这种方法确实导致聚合资源分配减少了大约一个数量级(以 MB 秒和 vcore 秒为单位)。这可能是由于先前 运行 中执行程序和线程的过度分配,并暗示瓶颈可能不受计算限制。

不确定我是否遵循了上面的所有描述。有一个 BiMap 双向字典,可以将列和行 id 从序数 Mahout id 转换为字符串外部 id。这些数据结构在内存中,每种类型的 id (row/column) 相当于 2 个哈希图。 reduceByKey 适用于 Mahout id,因此转换仅在输入和输出期间发生。这些数据结构被读入驱动程序,然后广播到每个节点,每个节点只制作一份副本,其中 BiMap(实际上是 BiDictionary)由执行程序共享。

分区默认设置为"auto"。 Mahout 11 中的哪个值应该是针对同现计算优化的值,这就是事情 "hum along" 的原因。

最后一步 reduceByKey 获取剩余矩阵(行键、向量)中的每个值,将键和向量元素的每个 id 转换回字符串并写出将文本并行发送到文件。

坦率地说,我发现文本文件的读写在很大程度上依赖于手动调整。我的主要经验是并行读取,其中 Spark 在分区之前读取所有文件统计信息——全部。与在阅读之前获取 1000 个文件并将它们连接成一个文件相比,这非常慢(自己尝试一下,他们可能已经解决了这个问题)。

我觉得你需要更好的 saveAsTextFilesaveAsTextFile 的手动调整可能最好使用您自己的分布式操作来完成,foreach 在根据您自己的参数对 RDD 进行一些重新分区后工作。请参阅此处的文档:http://spark.apache.org/docs/latest/programming-guide.html#printing-elements-of-an-rdd

如果您想进行实验,子类化 TextDelimitedIndexedDatasetReaderWriter 以提供您自己的编写器特征。 Mahout 有一个 mapBlock 操作也可以使用。它向每个 mapBlock 传递一个行块,您可以使用 BiMap 编写它来转换 ids。

希望听到 Mahout 用户列表中的任何结果。