Elasticsearch + Apache Spark 性能

Elasticsearch + Apache Spark performance

我正在尝试使用 Apache spark 在 Elasticsearch 中查询我的数据,但我的 spark 作业需要大约 20 个小时来进行聚合并且仍然 运行ning。 ES 中的相同查询大约需要 6 秒。

我了解数据必须从 Elasticsearch 集群移动到我的 Spark 集群,并在 Spark 中进行一些数据洗牌。

我的 ES 索引中的数据约为。 3 亿个文档,每个文档有大约 400 个字段(1.4Terrabyte)。

我有一个 3 节点的 spark 集群(1 个主节点,2 个工作节点),总共有 60GB 内存和 8 个内核。

运行 花费的时间不可接受,有没有办法让我的 spark 作业 运行 更快?

这是我的 spark 配置:

SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
                 .setMaster("spark://10.0.0.203:7077")    
                 .set("es.nodes", "10.0.0.207")
                 .set("es.cluster", "wp-es-reporting-prod")              
                .setJars(JavaSparkContext.jarOfClass(Demo.class))
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.default.parallelism", String.valueOf(cpus * 2))
                .set("spark.executor.memory", "8g");

已编辑

    SparkContext sparkCtx = new SparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sparkCtx);
    DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");

    DataFrame dfCleaned = cleanSchema(sqlContext, df);

    dfCleaned.registerTempTable("RPT");

    DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");

    for (Row row : sqlDFTest.collect()) {
        System.out.println(">> " + row);
    }

恐怕您无法以 120 GB 的总 RAM 执行超过 1.4 TB 的组并获得良好的性能。 DF 会尝试加载 memory/disk 中的所有数据,然后才会执行 group by。我认为目前 spark/ES 连接器不会翻译 ES 查询语言中的 sql 语法。

我知道发生了什么,基本上,我试图操纵数据框模式,因为我有一些带点的字段,例如 user.firstname。 这似乎会在 spark 的收集阶段引起问题。为了解决这个问题,我不得不重新索引我的数据,这样我的字段就不再有点,而是有下划线,例如 user_firstname.