使用 scala 通过 logstash 批量插入数据到 elasticsearch

Bulk insertion of data to elasticsearch via logstash with scala

我需要通过 scala 代码定期向 elasticsearch 插入大量数据。谷歌搜索时,我发现使用 logstash 来获得较大的插入率,但 logstash 没有任何 java 库或 Api 可以调用,所以我尝试通过 http 客户端连接到它。我不知道使用 http 协议发送大数据是一种好方法,还是使用其他方法更好,例如使用代理、队列、redis 等。

我知道最新版本的 logstash(6.X,7.x) 支持持久队列的使用,因此它可以是使用 logstash 队列的另一种解决方案,但还是通过 http 或 tcp 协议。

另请注意,可靠性是我的首要任务,因为数据不能丢失,并且应该有一种机制来 return 在代码中响应以处理成功或失败。

如有任何想法,我将不胜感激。

更新

似乎使用 http 是健壮的,并且具有基于 here 的确认机制,但如果采用这种方法,scala 中的 http 客户端库更合适,因为我需要按键值格式顺序发送批量数据并以 none-阻塞方式处理响应?

这听起来有点矫枉过正,但在 scala 代码和 logstash 之间引入缓冲层可能会有所帮助,因为您可以摆脱繁重的 HTTP 调用并依赖轻量级协议传输。

考虑在您的 scala 代码和 logstash 之间添加 Kafka 以进行消息排队。 Logstash 可以使用 TCP 传输和批量插入到 ElasticSearch 中可靠地处理来自 Kafka 的消息。另一方面,您可以在构建(批处理)中将消息从您的 Scala 代码放入 Kafka,以使整个管道高效工作。

话虽如此,如果您的容量不足 10,000 msgs/sec,那么您还可以考虑通过调整线程和使用多个 logstash 进程来摆弄 logstash HTTP 输入插件。这是为了降低在您的架构中添加另一个移动部件 (Kafka) 的复杂性。