MongoDB Spark Connector - 聚合速度慢
MongoDB Spark Connector - aggregation is slow
我运行使用 Spark 应用程序和 Mongos 控制台使用相同的聚合管道。在控制台上,数据在眨眼之间就被获取了,只需要第二次使用 "it" 就可以获取所有预期的数据。
然而,根据 Spark WebUI,Spark 应用程序需要将近两分钟的时间。
如您所见,正在启动 242 个任务来获取结果。我不确定为什么在 MongoDB 聚合仅返回 40 个文档的情况下启动如此大量的任务。看起来开销很大。
我在Mongos控制台运行的查询:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
Spark 应用代码
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
之后,我使用 hdfs dfs -getmerge /user/spark/output/ output.csv
并比较结果。
为什么聚合这么慢?调用 withPipeline
不是为了减少需要传输到 Spark 的数据量吗?看起来它并没有像 Mongos 控制台那样进行聚合。在 Mongos 控制台上,它运行得非常快。我正在使用 Spark 1.6.1 和 mongo-spark-connector_2.10 版本 1.1.0。
编辑:我想知道的另一件事是启动了两个执行程序(因为我使用的是默认执行设置 atm),但只有一个执行程序完成所有工作。为什么第二个执行者不做任何工作?
编辑 2:当使用不同的聚合管道并调用 .count()
而不是 saveAsTextFile(..)
时,还会创建 242 个任务。这次将返回 65,000 份文件。
大量任务是由默认的 Mongo Spark 分区程序策略引起的。它在计算分区时忽略聚合管道,主要原因有两个:
- 它减少了计算分区的成本
- 确保分片和非分片分区器的行为相同
但是,正如您所发现的那样,它们会生成空分区,这在您的情况下成本很高。
修复选项可以是:
更改分区策略
选择一个替代的分区器来减少分区的数量。例如,PaginateByCount 会将数据库拆分为一定数量的分区。
创建您自己的分区程序 - 只需实施该特征,您就可以应用聚合管道并对结果进行分区。有关示例,请参见 HalfwayPartitioner and custom partitioner test。
使用 $out 将结果预先聚合到一个集合中并从那里读取。
- 使用
coalesce(N)
将分区合并在一起并减少分区数量。
- 增加
spark.mongodb.input.partitionerOptions.partitionSizeMB
配置以产生更少的分区。
自定义分区程序应该会产生最佳解决方案,但有一些方法可以更好地利用可用的默认分区程序。
如果您认为应该有一个使用聚合管道计算分区的默认分区程序,请向 MongoDB Spark Jira project.
添加一个票证
我运行使用 Spark 应用程序和 Mongos 控制台使用相同的聚合管道。在控制台上,数据在眨眼之间就被获取了,只需要第二次使用 "it" 就可以获取所有预期的数据。 然而,根据 Spark WebUI,Spark 应用程序需要将近两分钟的时间。
如您所见,正在启动 242 个任务来获取结果。我不确定为什么在 MongoDB 聚合仅返回 40 个文档的情况下启动如此大量的任务。看起来开销很大。
我在Mongos控制台运行的查询:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
Spark 应用代码
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
之后,我使用 hdfs dfs -getmerge /user/spark/output/ output.csv
并比较结果。
为什么聚合这么慢?调用 withPipeline
不是为了减少需要传输到 Spark 的数据量吗?看起来它并没有像 Mongos 控制台那样进行聚合。在 Mongos 控制台上,它运行得非常快。我正在使用 Spark 1.6.1 和 mongo-spark-connector_2.10 版本 1.1.0。
编辑:我想知道的另一件事是启动了两个执行程序(因为我使用的是默认执行设置 atm),但只有一个执行程序完成所有工作。为什么第二个执行者不做任何工作?
编辑 2:当使用不同的聚合管道并调用 .count()
而不是 saveAsTextFile(..)
时,还会创建 242 个任务。这次将返回 65,000 份文件。
大量任务是由默认的 Mongo Spark 分区程序策略引起的。它在计算分区时忽略聚合管道,主要原因有两个:
- 它减少了计算分区的成本
- 确保分片和非分片分区器的行为相同
但是,正如您所发现的那样,它们会生成空分区,这在您的情况下成本很高。
修复选项可以是:
更改分区策略
选择一个替代的分区器来减少分区的数量。例如,PaginateByCount 会将数据库拆分为一定数量的分区。
创建您自己的分区程序 - 只需实施该特征,您就可以应用聚合管道并对结果进行分区。有关示例,请参见 HalfwayPartitioner and custom partitioner test。
使用 $out 将结果预先聚合到一个集合中并从那里读取。
- 使用
coalesce(N)
将分区合并在一起并减少分区数量。 - 增加
spark.mongodb.input.partitionerOptions.partitionSizeMB
配置以产生更少的分区。
自定义分区程序应该会产生最佳解决方案,但有一些方法可以更好地利用可用的默认分区程序。
如果您认为应该有一个使用聚合管道计算分区的默认分区程序,请向 MongoDB Spark Jira project.
添加一个票证