将 NiFi 连接到 ElasticSearch
Connecting NiFi to ElasticSearch
我正在尝试解决一项任务,如有任何帮助,我将不胜感激 - 文档链接、论坛链接、除 https://cwiki.apache.org/confluence/display/NIFI/FAQs 之外的其他常见问题解答,或此 post = 中任何有意义的答案).
所以,我有以下任务:
我系统的初始部分每 5-15 分钟从不同的数据库源收集数据。然后我删除重复项,删除垃圾,根据逻辑组合来自不同来源的数据,然后将其作为多个流重定向到系统的第二部分。
据我所知,"NiFi" 可以以最好的方式完成这项任务 =).
目前我可以通过 "GetHTTP" 处理器成功地从 InfluxDB 获取信息。但是,我无法配置相同类型的处理器来使用所有必要选项从 Elastic DB 获取信息。我想在 "now-minus-<5-15 minutes>" 到 "now" 的时间段内每 5-15 分钟接收一次数据。 (取决于调度程序周期)和几个额外的过滤器。如果我理解正确,这可以通过订阅“_index”或按所需间隔定期请求数据库来实现。
我知道 NiFi 有几个专门为 Elasticsearch 设计的处理器(FetchElasticsearch5、FetchElasticsearchHttp、QueryElasticsearchHttp、ScrollElasticsearchHttp)以及 GetHTTP 和 PostHTTP 处理器。然而,不幸的是,我缺乏信息,甚至更好 - 示例 - 如何为我的目的配置他们的 "Properties" =(。
FetchElasticsearchHttp、QueryElasticsearchHttp 有什么区别?哪一个更适合我的任务? GetHTTP 和 QueryElasticsearchHttp 除了几个特定字段外还有什么区别?如果我根据需要调整 GetHTTP,它会以相同的方式执行吗?
有什么建议吗?
如有任何帮助,我将不胜感激。
ElasticsearchHttp 处理器试图通过根据您设置的属性生成适当的 REST API 调用来简化与 ES 的交互。如果您知道您需要的完整 URL,您可以使用 GetHttp 或 InvokeHttp。然而,ESHttp 处理器让你只输入你正在寻找的东西,它会生成 URL 和 return 结果。
FetchElasticsearch(及其变体)用于在您知道标识符时获取特定文档。这有时在 search/query 之后使用,以便在您知道需要哪些文件后一次 return 个文件。
QueryElasticsearchHttp is for when you want to do a Lucene-style query of the documents, when you don't necessarily know which documents you want. It will only return up to the value of index.max_result_window for that index. To get more records, you can use ScrollElasticsearchHttp afterwards. NOTE: QueryElasticsearchHttp expects a query that will work as the "q" parameter of the URL. This "mini-language" does not support all fields/operators (see here 了解更多详情)。
对于您的用例,您可能需要 InvokeHttp in order to issue the kind of query you describe. This article describes how to issue a query for the last 15 minutes. Once your results are returned, you might need some combination of EvaluateJsonPath and/or SplitJson 来处理各个文档,有关详细信息,请参阅 Elasticsearch REST API 文档(和 NiFi 处理器文档)。
我正在尝试解决一项任务,如有任何帮助,我将不胜感激 - 文档链接、论坛链接、除 https://cwiki.apache.org/confluence/display/NIFI/FAQs 之外的其他常见问题解答,或此 post = 中任何有意义的答案).
所以,我有以下任务: 我系统的初始部分每 5-15 分钟从不同的数据库源收集数据。然后我删除重复项,删除垃圾,根据逻辑组合来自不同来源的数据,然后将其作为多个流重定向到系统的第二部分。 据我所知,"NiFi" 可以以最好的方式完成这项任务 =).
目前我可以通过 "GetHTTP" 处理器成功地从 InfluxDB 获取信息。但是,我无法配置相同类型的处理器来使用所有必要选项从 Elastic DB 获取信息。我想在 "now-minus-<5-15 minutes>" 到 "now" 的时间段内每 5-15 分钟接收一次数据。 (取决于调度程序周期)和几个额外的过滤器。如果我理解正确,这可以通过订阅“_index”或按所需间隔定期请求数据库来实现。
我知道 NiFi 有几个专门为 Elasticsearch 设计的处理器(FetchElasticsearch5、FetchElasticsearchHttp、QueryElasticsearchHttp、ScrollElasticsearchHttp)以及 GetHTTP 和 PostHTTP 处理器。然而,不幸的是,我缺乏信息,甚至更好 - 示例 - 如何为我的目的配置他们的 "Properties" =(。
FetchElasticsearchHttp、QueryElasticsearchHttp 有什么区别?哪一个更适合我的任务? GetHTTP 和 QueryElasticsearchHttp 除了几个特定字段外还有什么区别?如果我根据需要调整 GetHTTP,它会以相同的方式执行吗?
有什么建议吗?
如有任何帮助,我将不胜感激。
ElasticsearchHttp 处理器试图通过根据您设置的属性生成适当的 REST API 调用来简化与 ES 的交互。如果您知道您需要的完整 URL,您可以使用 GetHttp 或 InvokeHttp。然而,ESHttp 处理器让你只输入你正在寻找的东西,它会生成 URL 和 return 结果。
FetchElasticsearch(及其变体)用于在您知道标识符时获取特定文档。这有时在 search/query 之后使用,以便在您知道需要哪些文件后一次 return 个文件。
QueryElasticsearchHttp is for when you want to do a Lucene-style query of the documents, when you don't necessarily know which documents you want. It will only return up to the value of index.max_result_window for that index. To get more records, you can use ScrollElasticsearchHttp afterwards. NOTE: QueryElasticsearchHttp expects a query that will work as the "q" parameter of the URL. This "mini-language" does not support all fields/operators (see here 了解更多详情)。
对于您的用例,您可能需要 InvokeHttp in order to issue the kind of query you describe. This article describes how to issue a query for the last 15 minutes. Once your results are returned, you might need some combination of EvaluateJsonPath and/or SplitJson 来处理各个文档,有关详细信息,请参阅 Elasticsearch REST API 文档(和 NiFi 处理器文档)。