从 spark 写入 elasticsearch 非常慢
Write to elasticsearch from spark is very slow
我正在处理一个文本文件,并将转换后的行从 Spark 应用程序写入弹性搜索,如下所示
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + dir).save()
这运行得非常慢,大约需要 8 分钟才能写入 287.9 MB / 1513789 条记录。
鉴于网络延迟始终存在,我如何调整 spark 和 elasticsearch 设置以使其更快。
我在本地模式下使用 spark,有 16 个内核和 64GB 内存。
我的 elasticsearch 集群有 1 个主节点和 3 个数据节点,每个节点有 16 个核心和 64GB。
我正在阅读如下文本文件
val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"inferSchema" -> "false",
"header" -> "false",
"delimiter" -> "\t",
"comment" -> "#",
"mode" -> "PERMISSIVE")
.....
val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
首先,让我们从您的应用程序中发生的事情开始。 Apache Spark 正在读取 1 个(不是很大)csv
个压缩文件。因此first spark会花时间解压缩数据并在写入elasticsearch
之前对其进行扫描。
这将创建一个 Dataset
/DataFrame
和一个分区 (由您在评论中提到的 df.rdd.getNumPartitions
的结果确认) .
一个直接的解决方案是 repartition
读取数据并缓存它,然后再将其写入 elasticsearch
。现在我不确定你的数据是什么样的,所以决定分区的数量是你这边的基准测试主题。
val input = sqlContext.read.options(readOptions)
.csv(inputFile.getAbsolutePath)
.repartition(100) // 100 is just an example
.cache
我不确定对您的应用程序有多大好处,因为我相信可能还有其他瓶颈(网络 IO、ES 的磁盘类型)。
PS: 在构建 ETL 之前,我应该将 csv 转换为 parquet 文件。这里有真正的性能提升。 (个人意见和基准)
另一个可能的优化是调整 elasticsearch-spark 连接器的 es.batch.size.entries
设置。默认值为 1000
.
设置此参数时需要小心,因为您可能会使elasticsearch过载。我强烈建议您查看可用的配置 here。
希望对您有所帮助!
我正在处理一个文本文件,并将转换后的行从 Spark 应用程序写入弹性搜索,如下所示
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + dir).save()
这运行得非常慢,大约需要 8 分钟才能写入 287.9 MB / 1513789 条记录。
鉴于网络延迟始终存在,我如何调整 spark 和 elasticsearch 设置以使其更快。
我在本地模式下使用 spark,有 16 个内核和 64GB 内存。 我的 elasticsearch 集群有 1 个主节点和 3 个数据节点,每个节点有 16 个核心和 64GB。
我正在阅读如下文本文件
val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"inferSchema" -> "false",
"header" -> "false",
"delimiter" -> "\t",
"comment" -> "#",
"mode" -> "PERMISSIVE")
.....
val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
首先,让我们从您的应用程序中发生的事情开始。 Apache Spark 正在读取 1 个(不是很大)csv
个压缩文件。因此first spark会花时间解压缩数据并在写入elasticsearch
之前对其进行扫描。
这将创建一个 Dataset
/DataFrame
和一个分区 (由您在评论中提到的 df.rdd.getNumPartitions
的结果确认) .
一个直接的解决方案是 repartition
读取数据并缓存它,然后再将其写入 elasticsearch
。现在我不确定你的数据是什么样的,所以决定分区的数量是你这边的基准测试主题。
val input = sqlContext.read.options(readOptions)
.csv(inputFile.getAbsolutePath)
.repartition(100) // 100 is just an example
.cache
我不确定对您的应用程序有多大好处,因为我相信可能还有其他瓶颈(网络 IO、ES 的磁盘类型)。
PS: 在构建 ETL 之前,我应该将 csv 转换为 parquet 文件。这里有真正的性能提升。 (个人意见和基准)
另一个可能的优化是调整 elasticsearch-spark 连接器的 es.batch.size.entries
设置。默认值为 1000
.
设置此参数时需要小心,因为您可能会使elasticsearch过载。我强烈建议您查看可用的配置 here。
希望对您有所帮助!