用于 Spark 的 elasticsearch-hadoop。从不同索引的 RDD 发送文档(按天)
elasticsearch-hadoop for Spark. Send documents from a RDD in different index (by day)
我使用 Spark 处理复杂的工作流程(解析、清理、机器学习……)。
在工作流结束时,我想将聚合结果发送到 elasticsearch,以便我的门户可以查询数据。
将有两种类型的处理:流式处理和对所有可用数据重新启动工作流的可能性。
现在我使用 elasticsearch-hadoop 尤其是 spark 部分通过 saveJsonToEs(myindex/mytype) 方法将文档发送到 elasticsearch。
目标是使用我们构建的适当模板按天创建索引。
据我所知,您无法在文档中添加对某个功能的考虑,以将其发送到 elasticsearch-hadoop 中的正确索引。
实现此功能的正确方法是什么?
有一个使用 spark 和 bulk 的特殊步骤,以便每个执行者根据每一行的特性将文档发送到正确的索引?
我在 elasticsearch-hadoop 中遗漏了什么吗?
我尝试使用 saveJsonToEs("_bulk") 将 JSON 发送到 _bulk,但模式必须遵循 index/type
感谢 Costin Leau,我找到了解决方案。
只需使用类似 saveJsonToEs("my-index-{date}/my-type") 的动态索引。 "date" 必须是必须发送的文档中的一个功能。
关于elasticsearch的讨论google群:https://groups.google.com/forum/#!topic/elasticsearch/5-LwjQxVlhk
文档:http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html#spark-write-dyn
您可以使用 : ("custom-index-{date}/customtype") 创建动态索引。这可以是给定 rdd 中的任何字段。
如果要格式化日期:("custom-index-{date:{YYYY.mm.dd}}/customtype")
[回答了 Amit_Hora 在评论中提出的问题,因为我没有足够的权限发表评论,所以我在这里添加这个]
我使用 Spark 处理复杂的工作流程(解析、清理、机器学习……)。 在工作流结束时,我想将聚合结果发送到 elasticsearch,以便我的门户可以查询数据。 将有两种类型的处理:流式处理和对所有可用数据重新启动工作流的可能性。
现在我使用 elasticsearch-hadoop 尤其是 spark 部分通过 saveJsonToEs(myindex/mytype) 方法将文档发送到 elasticsearch。 目标是使用我们构建的适当模板按天创建索引。 据我所知,您无法在文档中添加对某个功能的考虑,以将其发送到 elasticsearch-hadoop 中的正确索引。
实现此功能的正确方法是什么? 有一个使用 spark 和 bulk 的特殊步骤,以便每个执行者根据每一行的特性将文档发送到正确的索引? 我在 elasticsearch-hadoop 中遗漏了什么吗?
我尝试使用 saveJsonToEs("_bulk") 将 JSON 发送到 _bulk,但模式必须遵循 index/type
感谢 Costin Leau,我找到了解决方案。 只需使用类似 saveJsonToEs("my-index-{date}/my-type") 的动态索引。 "date" 必须是必须发送的文档中的一个功能。
关于elasticsearch的讨论google群:https://groups.google.com/forum/#!topic/elasticsearch/5-LwjQxVlhk
文档:http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html#spark-write-dyn
您可以使用 : ("custom-index-{date}/customtype") 创建动态索引。这可以是给定 rdd 中的任何字段。
如果要格式化日期:("custom-index-{date:{YYYY.mm.dd}}/customtype") [回答了 Amit_Hora 在评论中提出的问题,因为我没有足够的权限发表评论,所以我在这里添加这个]