数据帧解析少量数据的速度很慢

Dataframes are slow to parse through small amount of data

我有 2 类 在 Apache Spark 中执行类似的任务,但是使用数据帧的任务比 "regular" 使用 RDD 的任务慢很多倍。 (30x)

我想使用数据框,因为它会减少很多代码,类我们有,但显然我不能让它慢那么多。

数据集没什么大不了的。我们有 30 个文件,每个文件都有 json 数据,每个文件都是关于从另一个软件中的活动触发的事件。每个文件中有 0 到 100 个事件。

一个包含 82 个事件的数据集用数据帧处理大约需要 5 分钟。

示例代码:

    public static void main(String[] args) throws ParseException, IOException {
  SparkConf sc = new SparkConf().setAppName("POC");
  JavaSparkContext jsc = new JavaSparkContext(sc);
  SQLContext sqlContext = new SQLContext(jsc);

  conf = new ConfImpl();

  HashSet<String> siteSet = new HashSet<>();

  // last month
  Date yesterday = monthDate(DateUtils.addDays(new Date(), -1)); // method that returns the date on the first of the month
  Date startTime = startofYear(new Date(yesterday.getTime())); // method that returns the date on the first of the year

  // list all the sites with a metric file
  JavaPairRDD<String, String> allMetricFiles = jsc.wholeTextFiles("hdfs:///somePath/*/poc.json");
  for ( Tuple2<String, String> each : allMetricFiles.toArray() ) {
    logger.info("Reading from " + each._1);
    DataFrame metric = sqlContext.read().format("json").load(each._1).cache();
    metric.count();
    boolean siteNameDisplayed = false;
    boolean dateDisplayed = false;

    do {
      Date endTime = DateUtils.addMonths(startTime, 1);
      HashSet<Row> totalUsersForThisMonth = new HashSet<>();
      for (String dataPoint : Conf.DataPoints) { // This is a String[] with 4 elements for this specific case
        try {
          if (siteNameDisplayed == false) {
            String siteName = parseSiteFromPath(each._1); // method returning a parsed String
            logger.info("Data for site: " + siteName);
            siteSet.add(siteName);
            siteNameDisplayed = true;
          }
          if ( dateDisplayed == false ) {
            logger.info("Month: " + formatDate(startTime)); // SimpleFormatDate("yyyy-MM-dd")
            dateDisplayed = true;
          }
          DataFrame lastMonth = metric.filter("event.eventId=\"" + dataPoint + "\"").filter("creationDate >= " + startTime.getTime()).filter("creationDate < " + endTime.getTime()).select("event.data.UserId").distinct();
          logger.info("Distinct for last month for " + dataPoint + ": " + lastMonth.count());
          totalUsersForThisMonth.addAll(lastMonth.collectAsList());
        } catch (Exception e) {
          // data does not fit the expected model so there is nothing to print
        }
      }
      logger.info("Total Unique for the month: " + totalStudentForThisMonth.size());
      startTime = DateUtils.addMonths(startTime, 1);
      dateDisplayed = false;
    } while ( startTime.getTime() < commonTmsMetric.monthDate(yesterday).getTime());

    // reset startTime for the next site
    startTime = commonTmsMetric.StartofYear(new Date(yesterday.getTime()));
  }
}

这段代码中有一些地方效率不高,但当我查看日志时,它只增加了整个处理过程的几秒钟。

我一定是漏掉了什么大东西。

我有 运行 这个有 2 个执行者和 1 个执行者,5 分钟差 20 秒。

这是 Hadoop 2.5.0 上的 运行 Java 1.7 和 Spark 1.4.1。

谢谢!

所以有一些事情,但如果没有看到不同任务及其时间的细分就很难说。简短的版本是您在驱动程序中做了很多工作,而没有利用 Spark 的分布式功能。

例如,您正在将所有数据收集回驱动程序(toArray() 和您的 for 循环)。相反,您应该将 Spark SQL 指向需要加载的文件。

对于操作员,您似乎在驱动程序中进行了很多聚合,相反,您可以使用驱动程序生成聚合并让 Spark SQL 执行它们。

您的内部代码和 DataFrame 代码之间的另一个重大区别是模式推断。由于您已经创建了 类 来表示您的数据,因此您似乎知道 JSON 数据的架构。您可以通过在读取时添加模式信息来加快代码速度,这样 Spark SQL 就可以跳过推理。

我建议重新审视这种方法并尝试使用 Spark SQL 的分布式运算符构建一些东西。