使用 Apache Spark 2.0.0 和 mllib 进行分布式 Word2Vec 模型训练
Distributed Word2Vec Model Training using Apache Spark 2.0.0 and mllib
我一直在尝试使用 spark 和 mllib 来训练 word2vec 模型,但我似乎没有在大型数据集上获得分布式机器学习的性能优势。我的理解是,如果我有 w 个工作人员,那么,如果我创建一个具有 n 个分区的 RDD,其中 n>w 并且我尝试通过调用 Word2Vec 的 fit 函数来创建一个 Word2Vec 模型,并将 RDD 作为参数,那么 spark 将分发数据统一地在这些 w 个 worker 上训练单独的 word2vec 模型,并在最后使用某种 reducer 函数从这些 w 个模型创建一个单一的输出模型。这将减少计算时间,而不是 1 个块,w 个数据块将被同时处理。权衡是可能会发生一些精度损失,具体取决于最后使用的 reducer 函数。 Spark 中的 Word2Vec 是否真的以这种方式工作?如果确实如此,我可能需要使用可配置参数。
编辑
添加问这个问题的原因。我 运行 java 在 10 台工作机器上启动 word2vec 代码,并为执行器内存、驱动程序内存和 num-executors 设置合适的值,在阅读文档后,为映射的 2.5gb 输入文本文件到 rdd 分区,然后用作 mllib word2vec 模型的训练数据。训练部分花了好几个小时。工作节点的数量似乎对训练时间没有太大影响。相同的代码在较小的数据文件(大约 10 MB)上成功运行
代码
SparkConf conf = new SparkConf().setAppName("SampleWord2Vec");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{String.class, List.class});
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<List<String>> jrdd = jsc.textFile(inputFile, 3).map(new Function<String, List<String>>(){
@Override
public List<String> call(String s) throws Exception {
return Arrays.asList(s.split(","));
}
});
jrdd.persist(StorageLevel.MEMORY_AND_DISK());
Word2Vec word2Vec = new Word2Vec()
.setWindowSize(20)
.setMinCount(20);
Word2VecModel model = word2Vec.fit(jrdd);
jrdd.unpersist(false);
model.save(jsc.sc(), outputfile);
jsc.stop();
jsc.close();
我没有发现您的代码有任何本质上的错误。但是,我强烈建议您考虑数据框 API。例如,这里有一张经常被抛出的小图表:
此外,我不知道您可能 "iterating" 是如何处理数据框的元素的(它们实际上不是这样工作的)。这是 Spark online docs:
中的示例
您有大致的想法...但是您必须首先将数据并行化为数据框。将 javardd 转换为 DataFrame 非常简单。
DataFrame fileDF = sqlContext.createDataFrame(jrdd, Model.class);
Spark 运行用有向无环图 (DAG) 代替 MR,但概念是相同的。 运行 'fit()
你的数据确实会 运行 跨越工人的数据,然后减少到一个单一的模型。但是这个模型本身会分布在内存中,直到你决定把它写下来。
但是,作为试验,通过 NLTK 或 Word2Vec 的原生 C++ 二进制文件 运行 同一个文件需要多长时间?
最后一个想法......你坚持使用内存和磁盘是有原因的吗? Spark 有一个原生的 .cache()
,默认情况下持久化到内存中。 Spark 的强大之处在于对内存中保存的数据进行机器学习……内存中的大数据。如果你坚持使用磁盘,即使使用 kryo,你也会在磁盘 I/O 上造成瓶颈。恕我直言,首先要尝试的是摆脱它并坚持记忆。如果性能有所提高,那很好,但是通过 DataFrames 依靠 Catalyst 的强大功能,您会发现性能突飞猛进。
我们没有讨论的一件事是您的集群。考虑一下您拥有的每个节点有多少内存……每个节点有多少个内核……您的集群是否与其他请求资源的应用程序虚拟化(像大多数虚拟主机一样过度配置)……是有帮助的你的集群在云端?共享还是专用?
你看过Spark的UI来分析代码的运行时间操作吗?当模型正在拟合时,当您 运行 top
在工人身上时,您会看到什么?您能看到完整的 CPU 利用率吗?您是否尝试过指定 --executor-cores
以确保充分利用 CPU?
我已经多次看到所有工作都在一个工作节点上的一个核心上完成的情况。获得此信息会很有帮助。
在对性能进行故障排除时,需要查看很多地方,包括 Spark 配置文件本身!
从评论、回答和反对票来看,我想我没能正确地提出我的问题。但我想知道的答案是肯定的,可以在 spark 上并行训练你的 word2vec 模型。此功能的拉取请求创建于很久以前:
https://github.com/apache/spark/pull/1719
在java中,spark mllib中的Word2Vec对象有一个setter方法(setNumPartitions)。这允许您在多个执行器上并行训练 word2vec 模型。
根据上述拉取请求的评论:
"为了使我们的实现更具可扩展性,我们分别训练每个分区,并在每次迭代后合并每个分区的模型。为了使模型更准确,可能需要多次迭代。"
希望这对某人有所帮助。
我一直在尝试使用 spark 和 mllib 来训练 word2vec 模型,但我似乎没有在大型数据集上获得分布式机器学习的性能优势。我的理解是,如果我有 w 个工作人员,那么,如果我创建一个具有 n 个分区的 RDD,其中 n>w 并且我尝试通过调用 Word2Vec 的 fit 函数来创建一个 Word2Vec 模型,并将 RDD 作为参数,那么 spark 将分发数据统一地在这些 w 个 worker 上训练单独的 word2vec 模型,并在最后使用某种 reducer 函数从这些 w 个模型创建一个单一的输出模型。这将减少计算时间,而不是 1 个块,w 个数据块将被同时处理。权衡是可能会发生一些精度损失,具体取决于最后使用的 reducer 函数。 Spark 中的 Word2Vec 是否真的以这种方式工作?如果确实如此,我可能需要使用可配置参数。
编辑
添加问这个问题的原因。我 运行 java 在 10 台工作机器上启动 word2vec 代码,并为执行器内存、驱动程序内存和 num-executors 设置合适的值,在阅读文档后,为映射的 2.5gb 输入文本文件到 rdd 分区,然后用作 mllib word2vec 模型的训练数据。训练部分花了好几个小时。工作节点的数量似乎对训练时间没有太大影响。相同的代码在较小的数据文件(大约 10 MB)上成功运行
代码
SparkConf conf = new SparkConf().setAppName("SampleWord2Vec");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{String.class, List.class});
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<List<String>> jrdd = jsc.textFile(inputFile, 3).map(new Function<String, List<String>>(){
@Override
public List<String> call(String s) throws Exception {
return Arrays.asList(s.split(","));
}
});
jrdd.persist(StorageLevel.MEMORY_AND_DISK());
Word2Vec word2Vec = new Word2Vec()
.setWindowSize(20)
.setMinCount(20);
Word2VecModel model = word2Vec.fit(jrdd);
jrdd.unpersist(false);
model.save(jsc.sc(), outputfile);
jsc.stop();
jsc.close();
我没有发现您的代码有任何本质上的错误。但是,我强烈建议您考虑数据框 API。例如,这里有一张经常被抛出的小图表:
此外,我不知道您可能 "iterating" 是如何处理数据框的元素的(它们实际上不是这样工作的)。这是 Spark online docs:
中的示例您有大致的想法...但是您必须首先将数据并行化为数据框。将 javardd 转换为 DataFrame 非常简单。
DataFrame fileDF = sqlContext.createDataFrame(jrdd, Model.class);
Spark 运行用有向无环图 (DAG) 代替 MR,但概念是相同的。 运行 'fit()
你的数据确实会 运行 跨越工人的数据,然后减少到一个单一的模型。但是这个模型本身会分布在内存中,直到你决定把它写下来。
但是,作为试验,通过 NLTK 或 Word2Vec 的原生 C++ 二进制文件 运行 同一个文件需要多长时间?
最后一个想法......你坚持使用内存和磁盘是有原因的吗? Spark 有一个原生的 .cache()
,默认情况下持久化到内存中。 Spark 的强大之处在于对内存中保存的数据进行机器学习……内存中的大数据。如果你坚持使用磁盘,即使使用 kryo,你也会在磁盘 I/O 上造成瓶颈。恕我直言,首先要尝试的是摆脱它并坚持记忆。如果性能有所提高,那很好,但是通过 DataFrames 依靠 Catalyst 的强大功能,您会发现性能突飞猛进。
我们没有讨论的一件事是您的集群。考虑一下您拥有的每个节点有多少内存……每个节点有多少个内核……您的集群是否与其他请求资源的应用程序虚拟化(像大多数虚拟主机一样过度配置)……是有帮助的你的集群在云端?共享还是专用?
你看过Spark的UI来分析代码的运行时间操作吗?当模型正在拟合时,当您 运行 top
在工人身上时,您会看到什么?您能看到完整的 CPU 利用率吗?您是否尝试过指定 --executor-cores
以确保充分利用 CPU?
我已经多次看到所有工作都在一个工作节点上的一个核心上完成的情况。获得此信息会很有帮助。
在对性能进行故障排除时,需要查看很多地方,包括 Spark 配置文件本身!
从评论、回答和反对票来看,我想我没能正确地提出我的问题。但我想知道的答案是肯定的,可以在 spark 上并行训练你的 word2vec 模型。此功能的拉取请求创建于很久以前:
https://github.com/apache/spark/pull/1719
在java中,spark mllib中的Word2Vec对象有一个setter方法(setNumPartitions)。这允许您在多个执行器上并行训练 word2vec 模型。 根据上述拉取请求的评论:
"为了使我们的实现更具可扩展性,我们分别训练每个分区,并在每次迭代后合并每个分区的模型。为了使模型更准确,可能需要多次迭代。"
希望这对某人有所帮助。