Spark Cassandra Aggregation java.lang.OutOfMemoryError: Java heap space

Spark Cassandra Aggregation java.lang.OutOfMemoryError: Java heap space

我一直在尝试学习如何使用 Apache Spark,但在尝试对 Cassandra 的列中的所有值求和时遇到问题(使用 datastax spark-cassandra-connector)。我尝试的一切都只会导致 java.lang.OutOfMemoryError: Java 堆 space

这是我提交给 spark master 的代码:

object Benchmark {
  def main( args: Array[ String ] ) {
    val conf    = new SparkConf()
                  .setAppName( "app" )
                  .set( "spark.cassandra.connection.host", "ec2-blah.compute-1.amazonaws.com" )
                  .set( "spark.cassandra.auth.username", "myusername" )
                  .set( "spark.cassandra.auth.password", "mypassword" )
                  .set( "spark.executor.memory", "4g" )
    val sc      = new SparkContext( conf )
    val tbl     = sc.cassandraTable( "mykeyspace", "mytable" )
    val res     = tbl.map(_.getFloat("sclrdata")).sum()

    println( "sum = " + res )
  }
}

现在我的集群中只有一个 spark worker 节点,考虑到 table 的大小,不可能一次将所有节点都放入内存。但是我认为这不是问题,因为 spark 应该懒惰地评估命令,并且对列中的所有值求和不需要将整个 table 立即驻留在内存中。

我是这个主题的新手,因此非常感谢任何关于为什么这不起作用的澄清或帮助如何正确地做到这一点。

谢谢

也许 spark 正在将整个 table 构建为内存分区中的单个分区,以便它可以对其进行映射操作。

我认为 spark 应该溢出到磁盘而不是抛出 OutOfMemoryExceptions,但如果只有一个分区,它可能无法溢出。我看到一个类似的问题here,他通过指定这样的拆分大小解决了这个问题:

conf = new SparkConf();
        conf.setAppName("Test");
        conf.setMaster("local[4]");
        conf.set("spark.cassandra.connection.host", "192.168.1.15").
        set("spark.executor.memory", "2g").
        set("spark.cassandra.input.split.size_in_mb", "67108864");

所以请尝试在您的配置文件中设置 spark.cassandra.input.split.size_in_mb。

我想这将允许 spark 总结 table 的块,然后在它需要 space 新块时从内存中逐出这些块。

您可以研究的另一件事是为 table RDD 指定一个允许它溢出到磁盘的存储级别。我认为您可以通过添加“.persist(StorageLevel.MEMORY_AND_DISK)”来做到这一点。默认值似乎是 MEMORY_ONLY。在 RDD 持久性部分查看有关存储级别 here 的更多信息。