Spark Streaming 和 ElasticSearch - 无法写入所有条目
Spark Streaming and ElasticSearch - Could not write all entries
我目前正在编写一个由生产者和消费者组成的 Scala 应用程序。生产者从外部源获取一些数据并将它们写入 Kafka。消费者从 Kafka 读取并写入 Elasticsearch。
消费者基于 Spark Streaming,每 5 秒从 Kafka 获取新消息并将它们写入 ElasticSearch。问题是我无法写入 ES,因为我收到很多错误,如下所示:
ERROR] [2015-04-24 11:21:14,734] [org.apache.spark.TaskContextImpl]:
Error in TaskCompletionListener
org.elasticsearch.hadoop.EsHadoopException: Could not write all
entries [3/26560] (maybe ES was overloaded?). Bailing out... at
org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225)
~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at
org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236)
~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at
org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:125)
~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at
org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write.apply$mcV$sp(EsRDDWriter.scala:33)
~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at
org.apache.spark.TaskContextImpl$$anon.onTaskCompletion(TaskContextImpl.scala:57)
~[spark-core_2.10-1.2.1.jar:1.2.1] at
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted.apply(TaskContextImpl.scala:68)
[spark-core_2.10-1.2.1.jar:1.2.1] at
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted.apply(TaskContextImpl.scala:66)
[spark-core_2.10-1.2.1.jar:1.2.1] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[na:na] at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[na:na] at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
[spark-core_2.10-1.2.1.jar:1.2.1] at
org.apache.spark.scheduler.Task.run(Task.scala:58)
[spark-core_2.10-1.2.1.jar:1.2.1] at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
[spark-core_2.10-1.2.1.jar:1.2.1] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_65] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_65] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
考虑到生产者每 15 秒写 6 条消息,所以我真的不明白这 "overload" 怎么可能发生(我什至清理了主题并刷新了所有旧消息,我认为这是相关的抵消问题)。 Spark Streaming每5秒执行一次的任务可以用下面的代码来概括:
val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))
//TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
log.info(s"*** EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***")
convertedResult.foreachRDD(rdd => {
rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))
})
如果我尝试打印消息而不是发送到 ES,一切都很好,我实际上只看到 6 条消息。为什么我不能写入 ES?
为了完整起见,我使用这个库写入 ES:elasticsearch-spark_2.10 最新的 beta 版本。
经过多次重试,我发现了一种写入 ElasticSearch 且不会出现任何错误的方法。基本上将参数 "es.batch.size.entries" -> "1"
传递给 saveToES 方法就解决了这个问题。我不明白为什么使用默认或任何其他批量大小会导致上述错误,考虑到如果我尝试写入的内容超过允许的最大批量大小,我会收到一条错误消息,而不是更少。
此外,我注意到实际上我是在写 ES 但不是我所有的消息,我每批丢失 1 到 3 条消息。
其中一种可能性是 cluster/shard 状态为红色。请解决此问题,这可能是由于未分配的副本造成的。一旦状态变为绿色,API 调用就成功了。
当我在 Spark 上将数据帧推送到 ES 时,出现了相同的错误消息。即使使用 "es.batch.size.entries" -> "1"
配置,我也有同样的错误。
一旦我在 ES 中增加了线程池,我就可以解决这个问题。
例如,
批量池
threadpool.bulk.type: fixed
threadpool.bulk.size: 600
threadpool.bulk.queue_size: 30000
这是一个文件写入冲突。
例如:
多个文档指定相同的_id供Elasticsearch使用。
这些文件位于不同的分区。
Spark 将多个分区写入 ES 同时。
结果是 Elasticsearch 一次接收单个文档的多个更新 - 来自多个来源/通过多个节点/包含不同的数据
"I was losing between 1 and 3 messages per batch."
- 批量大小 > 1 时的失败次数波动
- 如果批量写入大小为“1”则成功
就像这里已经提到的,这是一个文档写入冲突。
您的 convertedResult
数据流包含 多个具有相同 ID 的记录。当作为同一批次的一部分写入弹性时会产生上述错误。
可能的解决方案:
- 为每条记录生成唯一的 ID。根据您的用例,可以通过几种不同的方式完成。例如,一种常见的解决方案是通过组合 id 和 lastModifiedDate 字段来创建一个新字段,并在写入 elastic 时将该字段用作 id。
- 根据 id 对记录执行重复数据删除 - select 仅具有特定 id 的记录并丢弃其他重复项。根据您的用例,这可能是最新的记录(基于时间戳字段)、最完整的(大多数字段包含数据)等。
#1 解决方案将存储您在流中收到的所有记录。
#2 解决方案将根据您的重复数据删除逻辑仅存储特定 ID 的 唯一记录。此结果与设置 "es.batch.size.entries" -> "1"
相同,只是您不会通过一次写入一条记录来限制性能。
只是添加此错误的另一个潜在原因,希望它对某人有所帮助。
如果您的 Elasticsearch 索引有子文档,则:
- 如果你使用的是自定义路由字段(不是_id),那么根据
文档 不保证文档的唯一性。
这可能会导致从 spark 更新时出现问题。
- 如果您使用标准 _id,唯一性将被保留,但是您需要确保在从 Spark 写入 Elasticsearch 时提供以下选项:
- es.mapping.join
- es.mapping.routing
我目前正在编写一个由生产者和消费者组成的 Scala 应用程序。生产者从外部源获取一些数据并将它们写入 Kafka。消费者从 Kafka 读取并写入 Elasticsearch。
消费者基于 Spark Streaming,每 5 秒从 Kafka 获取新消息并将它们写入 ElasticSearch。问题是我无法写入 ES,因为我收到很多错误,如下所示:
ERROR] [2015-04-24 11:21:14,734] [org.apache.spark.TaskContextImpl]: Error in TaskCompletionListener org.elasticsearch.hadoop.EsHadoopException: Could not write all entries [3/26560] (maybe ES was overloaded?). Bailing out... at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:125) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write.apply$mcV$sp(EsRDDWriter.scala:33) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.apache.spark.TaskContextImpl$$anon.onTaskCompletion(TaskContextImpl.scala:57) ~[spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted.apply(TaskContextImpl.scala:68) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted.apply(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [na:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [na:na] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.scheduler.Task.run(Task.scala:58) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) [spark-core_2.10-1.2.1.jar:1.2.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
考虑到生产者每 15 秒写 6 条消息,所以我真的不明白这 "overload" 怎么可能发生(我什至清理了主题并刷新了所有旧消息,我认为这是相关的抵消问题)。 Spark Streaming每5秒执行一次的任务可以用下面的代码来概括:
val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))
//TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
log.info(s"*** EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***")
convertedResult.foreachRDD(rdd => {
rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))
})
如果我尝试打印消息而不是发送到 ES,一切都很好,我实际上只看到 6 条消息。为什么我不能写入 ES?
为了完整起见,我使用这个库写入 ES:elasticsearch-spark_2.10 最新的 beta 版本。
经过多次重试,我发现了一种写入 ElasticSearch 且不会出现任何错误的方法。基本上将参数 "es.batch.size.entries" -> "1"
传递给 saveToES 方法就解决了这个问题。我不明白为什么使用默认或任何其他批量大小会导致上述错误,考虑到如果我尝试写入的内容超过允许的最大批量大小,我会收到一条错误消息,而不是更少。
此外,我注意到实际上我是在写 ES 但不是我所有的消息,我每批丢失 1 到 3 条消息。
其中一种可能性是 cluster/shard 状态为红色。请解决此问题,这可能是由于未分配的副本造成的。一旦状态变为绿色,API 调用就成功了。
当我在 Spark 上将数据帧推送到 ES 时,出现了相同的错误消息。即使使用 "es.batch.size.entries" -> "1"
配置,我也有同样的错误。
一旦我在 ES 中增加了线程池,我就可以解决这个问题。
例如,
批量池
threadpool.bulk.type: fixed
threadpool.bulk.size: 600
threadpool.bulk.queue_size: 30000
这是一个文件写入冲突。
例如:
多个文档指定相同的_id供Elasticsearch使用。
这些文件位于不同的分区。
Spark 将多个分区写入 ES 同时。
结果是 Elasticsearch 一次接收单个文档的多个更新 - 来自多个来源/通过多个节点/包含不同的数据
"I was losing between 1 and 3 messages per batch."
- 批量大小 > 1 时的失败次数波动
- 如果批量写入大小为“1”则成功
就像这里已经提到的,这是一个文档写入冲突。
您的 convertedResult
数据流包含 多个具有相同 ID 的记录。当作为同一批次的一部分写入弹性时会产生上述错误。
可能的解决方案:
- 为每条记录生成唯一的 ID。根据您的用例,可以通过几种不同的方式完成。例如,一种常见的解决方案是通过组合 id 和 lastModifiedDate 字段来创建一个新字段,并在写入 elastic 时将该字段用作 id。
- 根据 id 对记录执行重复数据删除 - select 仅具有特定 id 的记录并丢弃其他重复项。根据您的用例,这可能是最新的记录(基于时间戳字段)、最完整的(大多数字段包含数据)等。
#1 解决方案将存储您在流中收到的所有记录。
#2 解决方案将根据您的重复数据删除逻辑仅存储特定 ID 的 唯一记录。此结果与设置 "es.batch.size.entries" -> "1"
相同,只是您不会通过一次写入一条记录来限制性能。
只是添加此错误的另一个潜在原因,希望它对某人有所帮助。 如果您的 Elasticsearch 索引有子文档,则:
- 如果你使用的是自定义路由字段(不是_id),那么根据 文档 不保证文档的唯一性。 这可能会导致从 spark 更新时出现问题。
- 如果您使用标准 _id,唯一性将被保留,但是您需要确保在从 Spark 写入 Elasticsearch 时提供以下选项:
- es.mapping.join
- es.mapping.routing