数据帧解析少量数据的速度很慢
Dataframes are slow to parse through small amount of data
我有 2 类 在 Apache Spark 中执行类似的任务,但是使用数据帧的任务比 "regular" 使用 RDD 的任务慢很多倍。 (30x)
我想使用数据框,因为它会减少很多代码,类我们有,但显然我不能让它慢那么多。
数据集没什么大不了的。我们有 30 个文件,每个文件都有 json 数据,每个文件都是关于从另一个软件中的活动触发的事件。每个文件中有 0 到 100 个事件。
一个包含 82 个事件的数据集用数据帧处理大约需要 5 分钟。
示例代码:
public static void main(String[] args) throws ParseException, IOException {
SparkConf sc = new SparkConf().setAppName("POC");
JavaSparkContext jsc = new JavaSparkContext(sc);
SQLContext sqlContext = new SQLContext(jsc);
conf = new ConfImpl();
HashSet<String> siteSet = new HashSet<>();
// last month
Date yesterday = monthDate(DateUtils.addDays(new Date(), -1)); // method that returns the date on the first of the month
Date startTime = startofYear(new Date(yesterday.getTime())); // method that returns the date on the first of the year
// list all the sites with a metric file
JavaPairRDD<String, String> allMetricFiles = jsc.wholeTextFiles("hdfs:///somePath/*/poc.json");
for ( Tuple2<String, String> each : allMetricFiles.toArray() ) {
logger.info("Reading from " + each._1);
DataFrame metric = sqlContext.read().format("json").load(each._1).cache();
metric.count();
boolean siteNameDisplayed = false;
boolean dateDisplayed = false;
do {
Date endTime = DateUtils.addMonths(startTime, 1);
HashSet<Row> totalUsersForThisMonth = new HashSet<>();
for (String dataPoint : Conf.DataPoints) { // This is a String[] with 4 elements for this specific case
try {
if (siteNameDisplayed == false) {
String siteName = parseSiteFromPath(each._1); // method returning a parsed String
logger.info("Data for site: " + siteName);
siteSet.add(siteName);
siteNameDisplayed = true;
}
if ( dateDisplayed == false ) {
logger.info("Month: " + formatDate(startTime)); // SimpleFormatDate("yyyy-MM-dd")
dateDisplayed = true;
}
DataFrame lastMonth = metric.filter("event.eventId=\"" + dataPoint + "\"").filter("creationDate >= " + startTime.getTime()).filter("creationDate < " + endTime.getTime()).select("event.data.UserId").distinct();
logger.info("Distinct for last month for " + dataPoint + ": " + lastMonth.count());
totalUsersForThisMonth.addAll(lastMonth.collectAsList());
} catch (Exception e) {
// data does not fit the expected model so there is nothing to print
}
}
logger.info("Total Unique for the month: " + totalStudentForThisMonth.size());
startTime = DateUtils.addMonths(startTime, 1);
dateDisplayed = false;
} while ( startTime.getTime() < commonTmsMetric.monthDate(yesterday).getTime());
// reset startTime for the next site
startTime = commonTmsMetric.StartofYear(new Date(yesterday.getTime()));
}
}
这段代码中有一些地方效率不高,但当我查看日志时,它只增加了整个处理过程的几秒钟。
我一定是漏掉了什么大东西。
我有 运行 这个有 2 个执行者和 1 个执行者,5 分钟差 20 秒。
这是 Hadoop 2.5.0 上的 运行 Java 1.7 和 Spark 1.4.1。
谢谢!
所以有一些事情,但如果没有看到不同任务及其时间的细分就很难说。简短的版本是您在驱动程序中做了很多工作,而没有利用 Spark 的分布式功能。
例如,您正在将所有数据收集回驱动程序(toArray() 和您的 for 循环)。相反,您应该将 Spark SQL 指向需要加载的文件。
对于操作员,您似乎在驱动程序中进行了很多聚合,相反,您可以使用驱动程序生成聚合并让 Spark SQL 执行它们。
您的内部代码和 DataFrame 代码之间的另一个重大区别是模式推断。由于您已经创建了 类 来表示您的数据,因此您似乎知道 JSON 数据的架构。您可以通过在读取时添加模式信息来加快代码速度,这样 Spark SQL 就可以跳过推理。
我建议重新审视这种方法并尝试使用 Spark SQL 的分布式运算符构建一些东西。
我有 2 类 在 Apache Spark 中执行类似的任务,但是使用数据帧的任务比 "regular" 使用 RDD 的任务慢很多倍。 (30x)
我想使用数据框,因为它会减少很多代码,类我们有,但显然我不能让它慢那么多。
数据集没什么大不了的。我们有 30 个文件,每个文件都有 json 数据,每个文件都是关于从另一个软件中的活动触发的事件。每个文件中有 0 到 100 个事件。
一个包含 82 个事件的数据集用数据帧处理大约需要 5 分钟。
示例代码:
public static void main(String[] args) throws ParseException, IOException {
SparkConf sc = new SparkConf().setAppName("POC");
JavaSparkContext jsc = new JavaSparkContext(sc);
SQLContext sqlContext = new SQLContext(jsc);
conf = new ConfImpl();
HashSet<String> siteSet = new HashSet<>();
// last month
Date yesterday = monthDate(DateUtils.addDays(new Date(), -1)); // method that returns the date on the first of the month
Date startTime = startofYear(new Date(yesterday.getTime())); // method that returns the date on the first of the year
// list all the sites with a metric file
JavaPairRDD<String, String> allMetricFiles = jsc.wholeTextFiles("hdfs:///somePath/*/poc.json");
for ( Tuple2<String, String> each : allMetricFiles.toArray() ) {
logger.info("Reading from " + each._1);
DataFrame metric = sqlContext.read().format("json").load(each._1).cache();
metric.count();
boolean siteNameDisplayed = false;
boolean dateDisplayed = false;
do {
Date endTime = DateUtils.addMonths(startTime, 1);
HashSet<Row> totalUsersForThisMonth = new HashSet<>();
for (String dataPoint : Conf.DataPoints) { // This is a String[] with 4 elements for this specific case
try {
if (siteNameDisplayed == false) {
String siteName = parseSiteFromPath(each._1); // method returning a parsed String
logger.info("Data for site: " + siteName);
siteSet.add(siteName);
siteNameDisplayed = true;
}
if ( dateDisplayed == false ) {
logger.info("Month: " + formatDate(startTime)); // SimpleFormatDate("yyyy-MM-dd")
dateDisplayed = true;
}
DataFrame lastMonth = metric.filter("event.eventId=\"" + dataPoint + "\"").filter("creationDate >= " + startTime.getTime()).filter("creationDate < " + endTime.getTime()).select("event.data.UserId").distinct();
logger.info("Distinct for last month for " + dataPoint + ": " + lastMonth.count());
totalUsersForThisMonth.addAll(lastMonth.collectAsList());
} catch (Exception e) {
// data does not fit the expected model so there is nothing to print
}
}
logger.info("Total Unique for the month: " + totalStudentForThisMonth.size());
startTime = DateUtils.addMonths(startTime, 1);
dateDisplayed = false;
} while ( startTime.getTime() < commonTmsMetric.monthDate(yesterday).getTime());
// reset startTime for the next site
startTime = commonTmsMetric.StartofYear(new Date(yesterday.getTime()));
}
}
这段代码中有一些地方效率不高,但当我查看日志时,它只增加了整个处理过程的几秒钟。
我一定是漏掉了什么大东西。
我有 运行 这个有 2 个执行者和 1 个执行者,5 分钟差 20 秒。
这是 Hadoop 2.5.0 上的 运行 Java 1.7 和 Spark 1.4.1。
谢谢!
所以有一些事情,但如果没有看到不同任务及其时间的细分就很难说。简短的版本是您在驱动程序中做了很多工作,而没有利用 Spark 的分布式功能。
例如,您正在将所有数据收集回驱动程序(toArray() 和您的 for 循环)。相反,您应该将 Spark SQL 指向需要加载的文件。
对于操作员,您似乎在驱动程序中进行了很多聚合,相反,您可以使用驱动程序生成聚合并让 Spark SQL 执行它们。
您的内部代码和 DataFrame 代码之间的另一个重大区别是模式推断。由于您已经创建了 类 来表示您的数据,因此您似乎知道 JSON 数据的架构。您可以通过在读取时添加模式信息来加快代码速度,这样 Spark SQL 就可以跳过推理。
我建议重新审视这种方法并尝试使用 Spark SQL 的分布式运算符构建一些东西。