Elasticsearch + Apache Spark 性能
Elasticsearch + Apache Spark performance
我正在尝试使用 Apache spark 在 Elasticsearch 中查询我的数据,但我的 spark 作业需要大约 20 个小时来进行聚合并且仍然 运行ning。 ES 中的相同查询大约需要 6 秒。
我了解数据必须从 Elasticsearch 集群移动到我的 Spark 集群,并在 Spark 中进行一些数据洗牌。
我的 ES 索引中的数据约为。 3 亿个文档,每个文档有大约 400 个字段(1.4Terrabyte)。
我有一个 3 节点的 spark 集群(1 个主节点,2 个工作节点),总共有 60GB 内存和 8 个内核。
运行 花费的时间不可接受,有没有办法让我的 spark 作业 运行 更快?
这是我的 spark 配置:
SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
.setMaster("spark://10.0.0.203:7077")
.set("es.nodes", "10.0.0.207")
.set("es.cluster", "wp-es-reporting-prod")
.setJars(JavaSparkContext.jarOfClass(Demo.class))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.default.parallelism", String.valueOf(cpus * 2))
.set("spark.executor.memory", "8g");
已编辑
SparkContext sparkCtx = new SparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkCtx);
DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");
DataFrame dfCleaned = cleanSchema(sqlContext, df);
dfCleaned.registerTempTable("RPT");
DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");
for (Row row : sqlDFTest.collect()) {
System.out.println(">> " + row);
}
恐怕您无法以 120 GB 的总 RAM 执行超过 1.4 TB 的组并获得良好的性能。
DF 会尝试加载 memory/disk 中的所有数据,然后才会执行 group by。我认为目前 spark/ES 连接器不会翻译 ES 查询语言中的 sql 语法。
我知道发生了什么,基本上,我试图操纵数据框模式,因为我有一些带点的字段,例如 user.firstname。
这似乎会在 spark 的收集阶段引起问题。为了解决这个问题,我不得不重新索引我的数据,这样我的字段就不再有点,而是有下划线,例如 user_firstname.
我正在尝试使用 Apache spark 在 Elasticsearch 中查询我的数据,但我的 spark 作业需要大约 20 个小时来进行聚合并且仍然 运行ning。 ES 中的相同查询大约需要 6 秒。
我了解数据必须从 Elasticsearch 集群移动到我的 Spark 集群,并在 Spark 中进行一些数据洗牌。
我的 ES 索引中的数据约为。 3 亿个文档,每个文档有大约 400 个字段(1.4Terrabyte)。
我有一个 3 节点的 spark 集群(1 个主节点,2 个工作节点),总共有 60GB 内存和 8 个内核。
运行 花费的时间不可接受,有没有办法让我的 spark 作业 运行 更快?
这是我的 spark 配置:
SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
.setMaster("spark://10.0.0.203:7077")
.set("es.nodes", "10.0.0.207")
.set("es.cluster", "wp-es-reporting-prod")
.setJars(JavaSparkContext.jarOfClass(Demo.class))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.default.parallelism", String.valueOf(cpus * 2))
.set("spark.executor.memory", "8g");
已编辑
SparkContext sparkCtx = new SparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkCtx);
DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");
DataFrame dfCleaned = cleanSchema(sqlContext, df);
dfCleaned.registerTempTable("RPT");
DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");
for (Row row : sqlDFTest.collect()) {
System.out.println(">> " + row);
}
恐怕您无法以 120 GB 的总 RAM 执行超过 1.4 TB 的组并获得良好的性能。 DF 会尝试加载 memory/disk 中的所有数据,然后才会执行 group by。我认为目前 spark/ES 连接器不会翻译 ES 查询语言中的 sql 语法。
我知道发生了什么,基本上,我试图操纵数据框模式,因为我有一些带点的字段,例如 user.firstname。 这似乎会在 spark 的收集阶段引起问题。为了解决这个问题,我不得不重新索引我的数据,这样我的字段就不再有点,而是有下划线,例如 user_firstname.