Spark - 减少操作时间过长
Spark - Reduce operation taking too long
我正在使用 Spark 制作一个应用程序,它将 运行 一些主题提取算法。为此,首先我需要进行一些预处理,最后提取文档术语矩阵。我可以做到这一点,但是对于(不是那么多)大量文档(只有 2000,5MB),这个过程要花很长时间。
所以,调试,我发现程序有点卡住了,它在减少操作。我在这部分代码中所做的是计算每个术语在集合中出现的次数,所以首先我做了一个 "map",为每个 rdd 计算它,然后我 "reduce" 它,将结果保存在哈希图中。 map操作非常快,但是在reduce中,它把操作分成40个块,每个块需要5~10分钟来处理。
所以我想弄清楚我做错了什么,或者 reduce 操作的成本是否那么高。
SparkConf:独立模式,使用local[2]。我试过将它用作 "spark://master:7077",它起作用了,但仍然很慢。
代码:
"filesIn" 是一个 JavaPairRDD,其中键是文件路径,值是文件的内容。
所以,首先是地图,我把这个"filesIn",拆分单词,然后计算它们的频率(在这种情况下,什么文档并不重要)
然后是 reduce,我在其中创建了一个 HashMap (term, freq).
JavaRDD<HashMap<String, Integer>> termDF_ = filesIn.map(new Function<Tuple2<String, String>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> call(Tuple2<String, String> t) throws Exception {
String[] allWords = t._2.split(" ");
HashMap<String, Double> hashTermFreq = new HashMap<String, Double>();
ArrayList<String> words = new ArrayList<String>();
ArrayList<String> terms = new ArrayList<String>();
HashMap<String, Integer> termDF = new HashMap<String, Integer>();
for (String term : allWords) {
if (hashTermFreq.containsKey(term)) {
Double freq = hashTermFreq.get(term);
hashTermFreq.put(term, freq + 1);
} else {
if (term.length() > 1) {
hashTermFreq.put(term, 1.0);
if (!terms.contains(term)) {
terms.add(term);
}
if (!words.contains(term)) {
words.add(term);
if (termDF.containsKey(term)) {
int value = termDF.get(term);
value++;
termDF.put(term, value);
} else {
termDF.put(term, 1);
}
}
}
}
}
return termDF;
}
});
HashMap<String, Integer> termDF = termDF_.reduce(new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> call(HashMap<String, Integer> t1, HashMap<String, Integer> t2) throws Exception {
HashMap<String, Integer> result = new HashMap<String, Integer>();
Iterator iterator = t1.keySet().iterator();
while (iterator.hasNext()) {
String key = (String) iterator.next();
if (result.containsKey(key) == false) {
result.put(key, t1.get(key));
} else {
result.put(key, result.get(key) + 1);
}
}
iterator = t2.keySet().iterator();
while (iterator.hasNext()) {
String key = (String) iterator.next();
if (result.containsKey(key) == false) {
result.put(key, t2.get(key));
} else {
result.put(key, result.get(key) + 1);
}
}
return result;
}
});
谢谢!
好的,所以就在我的脑海里:
- Spark 转换是惰性的。这意味着
map
在您调用后续 reduce
操作之前不会执行,因此您描述的慢速 reduce
很可能是慢速 map
+ reduce
ArrayList.contains
是 O(N) 所以所有这些 words.contains
和 terms.contains
都非常低效
map
逻辑有问题。特别是:
- 如果学期已经看过,你永远不会进入
else
分支
- 乍一看
words
和terms
应该有完全相同的内容,应该等同于hashTermFreq
键或termDF
键。
- 看起来
termDF
中的值只能取值 1。如果这是您想要的并且您忽略了频率,那么创建 hashTermFreq
的意义何在?
reduce
阶段在此处实现意味着低效的线性扫描,对象在数据上不断增长,而您真正想要的是 reduceByKey
。
使用 Scala 作为伪代码,您的整个代码可以有效地表达如下:
val termDF = filesIn.flatMap{
case (_, text) =>
text.split(" ") // Split
.toSet // Take unique terms
.filter(_.size > 1) // Remove single characters
.map(term => (term, 1))} // map to pairs
.reduceByKey(_ + _) // Reduce by key
termDF.collectAsMap // Optionally
看来您终于要重新发明轮子了。至少您需要的一些工具已经在 mllib.feature
or ml.feature
中实现
我正在使用 Spark 制作一个应用程序,它将 运行 一些主题提取算法。为此,首先我需要进行一些预处理,最后提取文档术语矩阵。我可以做到这一点,但是对于(不是那么多)大量文档(只有 2000,5MB),这个过程要花很长时间。
所以,调试,我发现程序有点卡住了,它在减少操作。我在这部分代码中所做的是计算每个术语在集合中出现的次数,所以首先我做了一个 "map",为每个 rdd 计算它,然后我 "reduce" 它,将结果保存在哈希图中。 map操作非常快,但是在reduce中,它把操作分成40个块,每个块需要5~10分钟来处理。
所以我想弄清楚我做错了什么,或者 reduce 操作的成本是否那么高。
SparkConf:独立模式,使用local[2]。我试过将它用作 "spark://master:7077",它起作用了,但仍然很慢。
代码:
"filesIn" 是一个 JavaPairRDD,其中键是文件路径,值是文件的内容。 所以,首先是地图,我把这个"filesIn",拆分单词,然后计算它们的频率(在这种情况下,什么文档并不重要) 然后是 reduce,我在其中创建了一个 HashMap (term, freq).
JavaRDD<HashMap<String, Integer>> termDF_ = filesIn.map(new Function<Tuple2<String, String>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> call(Tuple2<String, String> t) throws Exception {
String[] allWords = t._2.split(" ");
HashMap<String, Double> hashTermFreq = new HashMap<String, Double>();
ArrayList<String> words = new ArrayList<String>();
ArrayList<String> terms = new ArrayList<String>();
HashMap<String, Integer> termDF = new HashMap<String, Integer>();
for (String term : allWords) {
if (hashTermFreq.containsKey(term)) {
Double freq = hashTermFreq.get(term);
hashTermFreq.put(term, freq + 1);
} else {
if (term.length() > 1) {
hashTermFreq.put(term, 1.0);
if (!terms.contains(term)) {
terms.add(term);
}
if (!words.contains(term)) {
words.add(term);
if (termDF.containsKey(term)) {
int value = termDF.get(term);
value++;
termDF.put(term, value);
} else {
termDF.put(term, 1);
}
}
}
}
}
return termDF;
}
});
HashMap<String, Integer> termDF = termDF_.reduce(new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> call(HashMap<String, Integer> t1, HashMap<String, Integer> t2) throws Exception {
HashMap<String, Integer> result = new HashMap<String, Integer>();
Iterator iterator = t1.keySet().iterator();
while (iterator.hasNext()) {
String key = (String) iterator.next();
if (result.containsKey(key) == false) {
result.put(key, t1.get(key));
} else {
result.put(key, result.get(key) + 1);
}
}
iterator = t2.keySet().iterator();
while (iterator.hasNext()) {
String key = (String) iterator.next();
if (result.containsKey(key) == false) {
result.put(key, t2.get(key));
} else {
result.put(key, result.get(key) + 1);
}
}
return result;
}
});
谢谢!
好的,所以就在我的脑海里:
- Spark 转换是惰性的。这意味着
map
在您调用后续reduce
操作之前不会执行,因此您描述的慢速reduce
很可能是慢速map
+reduce
ArrayList.contains
是 O(N) 所以所有这些words.contains
和terms.contains
都非常低效map
逻辑有问题。特别是:- 如果学期已经看过,你永远不会进入
else
分支 - 乍一看
words
和terms
应该有完全相同的内容,应该等同于hashTermFreq
键或termDF
键。 - 看起来
termDF
中的值只能取值 1。如果这是您想要的并且您忽略了频率,那么创建hashTermFreq
的意义何在?
- 如果学期已经看过,你永远不会进入
reduce
阶段在此处实现意味着低效的线性扫描,对象在数据上不断增长,而您真正想要的是reduceByKey
。
使用 Scala 作为伪代码,您的整个代码可以有效地表达如下:
val termDF = filesIn.flatMap{
case (_, text) =>
text.split(" ") // Split
.toSet // Take unique terms
.filter(_.size > 1) // Remove single characters
.map(term => (term, 1))} // map to pairs
.reduceByKey(_ + _) // Reduce by key
termDF.collectAsMap // Optionally
看来您终于要重新发明轮子了。至少您需要的一些工具已经在 mllib.feature
or ml.feature