Mahout spark-itemsimilarity saveAsTextFile 最后阶段非常慢
Mahout spark-itemsimilarity saveAsTextFile final stage is very slow
我在 HDP 上以 YARN 客户端模式在 Spark 1.5.1 上使用 Mahout 0.11.0 2.2 来自 cli 的集群。我的输入大约是 325Mb,分成 1000 个部分文件。这是我调用的确切命令:
$MAHOUT_HOME/bin/mahout spark-itemsimilarity --input unit-similarity-dump/bpc1 --output mahout-cooccurrence-output4 --maxPrefs 200 --maxSimilaritiesPerItem 100 --master yarn-client --sparkExecutorMem 10g -D:spark.yarn.executor.memoryOverhead=1024 -D:spark.executor.cores=5 -D:spark.executor.instances=50 -D:spark.yarn.am.memory=4g -D:spark.yarn.am.memoryOverhead=512 -D:spark.yarn.am.cores=2 -D:spark.driver.memory=20g -D:spark.yarn.driver.memoryOverhead=2048 -D:spark.driver.cores=4 -D:spark.driver.maxResultSize=10g -D:spark.yarn.queue=product -D:hdp.version=2.2.6.0-2800
应用程序运行良好,直到最后阶段 saveAsTextFile
被调用。此时,任务逐渐停止,每个任务都需要 45 分钟到一个小时才能成功。仔细检查后,似乎每个任务都在读取 MapPartitionsRDD 的所有 1000 个分区,直觉上我认为这一定是性能问题的根源。这些分区在所有执行程序中均匀分布,因此我认为每个任务都需要从不是其直接父级的 n-1 个执行程序请求所有分区。
优化此应用程序的最佳方法是什么?更少的分区,所以要请求的远程数据更少?更少的执行者,所以每个任务的本地化数据百分比更高?尝试为 RDD 强制设置更高的复制因子?现在它似乎默认为 存储级别:内存反序列化 1x 复制,100% 缓存。
为清楚起见,这里是舞台详细信息的屏幕截图:saveAsTextFile stage
提前感谢您的任何见解。
更新:
我尝试只使用 1 个多核执行器(即任务),尽管所有 RDD 分区都存在于单个本地执行器上,但性能仍然很慢。我认为唯一的罪魁祸首是最终 saveAsTextFile
DAG 中 reduceByKey
造成的洗牌。
第二次更新:
我也尝试过只使用 1 个输入分区,而我以前使用过 100 个甚至 1000 个。结果非常相似,总结如下 here。为清楚起见,在此 运行 中,我使用了一个 20g 的 5 核执行器。然而,这种方法确实导致聚合资源分配减少了大约一个数量级(以 MB 秒和 vcore 秒为单位)。这可能是由于先前 运行 中执行程序和线程的过度分配,并暗示瓶颈可能不受计算限制。
不确定我是否遵循了上面的所有描述。有一个 BiMap 双向字典,可以将列和行 id 从序数 Mahout id 转换为字符串外部 id。这些数据结构在内存中,每种类型的 id (row/column) 相当于 2 个哈希图。 reduceByKey 适用于 Mahout id,因此转换仅在输入和输出期间发生。这些数据结构被读入驱动程序,然后广播到每个节点,每个节点只制作一份副本,其中 BiMap(实际上是 BiDictionary)由执行程序共享。
分区默认设置为"auto"。 Mahout 11 中的哪个值应该是针对同现计算优化的值,这就是事情 "hum along" 的原因。
最后一步在 reduceByKey 获取剩余矩阵(行键、向量)中的每个值,将键和向量元素的每个 id 转换回字符串并写出将文本并行发送到文件。
坦率地说,我发现文本文件的读写在很大程度上依赖于手动调整。我的主要经验是并行读取,其中 Spark 在分区之前读取所有文件统计信息——全部。与在阅读之前获取 1000 个文件并将它们连接成一个文件相比,这非常慢(自己尝试一下,他们可能已经解决了这个问题)。
我觉得你需要更好的 saveAsTextFile
。 saveAsTextFile
的手动调整可能最好使用您自己的分布式操作来完成,foreach
在根据您自己的参数对 RDD 进行一些重新分区后工作。请参阅此处的文档:http://spark.apache.org/docs/latest/programming-guide.html#printing-elements-of-an-rdd
如果您想进行实验,子类化 TextDelimitedIndexedDatasetReaderWriter 以提供您自己的编写器特征。 Mahout 有一个 mapBlock
操作也可以使用。它向每个 mapBlock
传递一个行块,您可以使用 BiMap 编写它来转换 ids。
希望听到 Mahout 用户列表中的任何结果。
我在 HDP 上以 YARN 客户端模式在 Spark 1.5.1 上使用 Mahout 0.11.0 2.2 来自 cli 的集群。我的输入大约是 325Mb,分成 1000 个部分文件。这是我调用的确切命令:
$MAHOUT_HOME/bin/mahout spark-itemsimilarity --input unit-similarity-dump/bpc1 --output mahout-cooccurrence-output4 --maxPrefs 200 --maxSimilaritiesPerItem 100 --master yarn-client --sparkExecutorMem 10g -D:spark.yarn.executor.memoryOverhead=1024 -D:spark.executor.cores=5 -D:spark.executor.instances=50 -D:spark.yarn.am.memory=4g -D:spark.yarn.am.memoryOverhead=512 -D:spark.yarn.am.cores=2 -D:spark.driver.memory=20g -D:spark.yarn.driver.memoryOverhead=2048 -D:spark.driver.cores=4 -D:spark.driver.maxResultSize=10g -D:spark.yarn.queue=product -D:hdp.version=2.2.6.0-2800
应用程序运行良好,直到最后阶段 saveAsTextFile
被调用。此时,任务逐渐停止,每个任务都需要 45 分钟到一个小时才能成功。仔细检查后,似乎每个任务都在读取 MapPartitionsRDD 的所有 1000 个分区,直觉上我认为这一定是性能问题的根源。这些分区在所有执行程序中均匀分布,因此我认为每个任务都需要从不是其直接父级的 n-1 个执行程序请求所有分区。
优化此应用程序的最佳方法是什么?更少的分区,所以要请求的远程数据更少?更少的执行者,所以每个任务的本地化数据百分比更高?尝试为 RDD 强制设置更高的复制因子?现在它似乎默认为 存储级别:内存反序列化 1x 复制,100% 缓存。
为清楚起见,这里是舞台详细信息的屏幕截图:saveAsTextFile stage
提前感谢您的任何见解。
更新:
我尝试只使用 1 个多核执行器(即任务),尽管所有 RDD 分区都存在于单个本地执行器上,但性能仍然很慢。我认为唯一的罪魁祸首是最终 saveAsTextFile
DAG 中 reduceByKey
造成的洗牌。
第二次更新:
我也尝试过只使用 1 个输入分区,而我以前使用过 100 个甚至 1000 个。结果非常相似,总结如下 here。为清楚起见,在此 运行 中,我使用了一个 20g 的 5 核执行器。然而,这种方法确实导致聚合资源分配减少了大约一个数量级(以 MB 秒和 vcore 秒为单位)。这可能是由于先前 运行 中执行程序和线程的过度分配,并暗示瓶颈可能不受计算限制。
不确定我是否遵循了上面的所有描述。有一个 BiMap 双向字典,可以将列和行 id 从序数 Mahout id 转换为字符串外部 id。这些数据结构在内存中,每种类型的 id (row/column) 相当于 2 个哈希图。 reduceByKey 适用于 Mahout id,因此转换仅在输入和输出期间发生。这些数据结构被读入驱动程序,然后广播到每个节点,每个节点只制作一份副本,其中 BiMap(实际上是 BiDictionary)由执行程序共享。
分区默认设置为"auto"。 Mahout 11 中的哪个值应该是针对同现计算优化的值,这就是事情 "hum along" 的原因。
最后一步在 reduceByKey 获取剩余矩阵(行键、向量)中的每个值,将键和向量元素的每个 id 转换回字符串并写出将文本并行发送到文件。
坦率地说,我发现文本文件的读写在很大程度上依赖于手动调整。我的主要经验是并行读取,其中 Spark 在分区之前读取所有文件统计信息——全部。与在阅读之前获取 1000 个文件并将它们连接成一个文件相比,这非常慢(自己尝试一下,他们可能已经解决了这个问题)。
我觉得你需要更好的 saveAsTextFile
。 saveAsTextFile
的手动调整可能最好使用您自己的分布式操作来完成,foreach
在根据您自己的参数对 RDD 进行一些重新分区后工作。请参阅此处的文档:http://spark.apache.org/docs/latest/programming-guide.html#printing-elements-of-an-rdd
如果您想进行实验,子类化 TextDelimitedIndexedDatasetReaderWriter 以提供您自己的编写器特征。 Mahout 有一个 mapBlock
操作也可以使用。它向每个 mapBlock
传递一个行块,您可以使用 BiMap 编写它来转换 ids。
希望听到 Mahout 用户列表中的任何结果。