调用超过 1,000 列的 stddev 时,SparkSQL 作业失败

SparkSQL job fails when calling stddev over 1,000 columns

我在使用 Spark 2.2.1 和 Scala 2.11 的 DataBricks。我正在尝试 运行 一个如下所示的 SQL 查询。

select stddev(col1), stddev(col2), ..., stddev(col1300)
from mydb.mytable

我再执行如下代码

myRdd = sqlContext.sql(sql)

但是,我看到抛出以下异常。

Job aborted due to stage failure: Task 24 in stage 16.0 failed 4 times, most recent failure: Lost task 24.3 in stage 16.0 (TID 1946, 10.184.163.105, executor 3): org.codehaus.janino.JaninoRuntimeException: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection has grown past JVM limit of 0xFFFF
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificMutableProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private InternalRow mutableRow;
/* 009 */   private boolean evalExprIsNull;
/* 010 */   private boolean evalExprValue;
/* 011 */   private boolean evalExpr1IsNull;
/* 012 */   private boolean evalExpr1Value;
/* 013 */   private boolean evalExpr2IsNull;
/* 014 */   private boolean evalExpr2Value;
/* 015 */   private boolean evalExpr3IsNull;
/* 016 */   private boolean evalExpr3Value;
/* 017 */   private boolean evalExpr4IsNull;
/* 018 */   private boolean evalExpr4Value;
/* 019 */   private boolean evalExpr5IsNull;
/* 020 */   private boolean evalExpr5Value;
/* 021 */   private boolean evalExpr6IsNull;

stacktrace 一直在继续,甚至 Databricks notebook 都因为冗长而崩溃。有人见过这个吗?

此外,我有以下 2 个 SQL 语句来获取我执行的平均值和中位数,没有任何问题。

select avg(col1), ..., avg(col1300) from mydb.mytable
select percentile_approx(col1, 0.5), ..., percentile_approx(col1300, 0.5) from mydb.mytable

问题似乎与 stddev 有关,但异常没有帮助。对发生的事情有什么想法吗?有没有另一种方法可以轻松计算标准偏差,不会导致这个问题?

原来这个描述了同样的问题,说由于64KB大小的限制,Spark无法处理宽模式或很多列类。但是,如果是这样,那么为什么 avgpercentile_approx 有效?

几个选项:

  • 尝试禁用整个阶段代码生成:

    spark.conf.set("spark.sql.codegen.wholeStage", false)
    
  • 如果以上没有帮助切换到 RDD(从 by zeo323 采纳):

    import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
    
    val columns: Seq[String] = ???
    
    df
      .select(columns map (col(_).cast("double")): _*)
      .rdd
      .map(row => Vectors.dense(columns.map(row.getAs[Double](_)).toArray))
      .aggregate(new MultivariateOnlineSummarizer)(
         (agg, v) => agg.add(v), 
         (agg1, agg2) => agg1.merge(agg2))
    
  • Assemble 列合并为一个向量,使用 VectorAssembler 并使用 Aggregator,类似于使用 的那个,调整 finish 方法(您可能需要一些额外的调整才能将 ml.linalg.Vectors 转换为 mllib.linalg.Vectors)。

However, if that's the case, then why does avg and percentile_approx work?

Spark 确实为这些阶段生成 Java 代码。因为逻辑不一样,所以输出的大小也会不一样。