如何将 Kafka 读取批处理到 Elasticsearch

How can I batch Kafka reads to Elasticsearch

我对 Kafka 不太熟悉,但我想知道什么是最好的方法 从 Kafka 批量读取数据,这样我就可以使用 Elasticsearch Bulk Api 更快更可靠地加载数据。

顺便说一句,我正在为我的 Kafka 消费者使用 Vertx

谢谢,

我不知道这是否是最好的方法,但是当我开始寻找类似的功能时,我找不到任何现成的框架。我找到了这个项目:

https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer/tree/branch2.0

并开始为它做贡献,因为它没有做我想做的一切,而且也不容易扩展。现在 2.0 版本非常可靠,我们在公司的生产中使用它 processing/indexing 每天 3 亿个事件。

这不是自我推销:) - 只是分享我们如何做同类工作。当然,现在可能还有其他选择。

我使用了 spark streaming,它是使用 Scala 的一个非常简单的实现。

https://github.com/confluentinc/kafka-connect-elasticsearch

或者你可以试试这个来源

https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer

运行 作为标准 Jar

**1。将代码下载到 $INDEXER_HOME 目录中。

**2。 cp $INDEXER_HOME/src/main/resources/kafka-es-indexer.properties.template /your/absolute/path/kafka-es-indexer.properties 文件 - 按照评论

中的说明更新所有相关属性

**3。 cp $INDEXER_HOME/src/main/resources/logback.xml.template /your/absolute/path/logback.xml

指定要存储日志的目录:

根据需要调整最大大小和日志文件数量的值

**4。 build/create 应用程序 jar(确保安装了 MAven):

cd $INDEXER_HOME
mvn clean package

kafka-es-indexer-2.0.jar 将在 $INDEXER_HOME/bin 中创建。所有依赖项都将放在 $INDEXER_HOME/bin/lib 中。所有 JAR 依赖项都通过 kafka-es-indexer-2.0.jar manifest.

链接

**5。编辑您的 $INDEXER_HOME/run_indexer.sh 脚本: -- 如果需要,使其可执行 (chmod a+x $INDEXER_HOME/run_indexer.sh) -- 根据您的环境更新标有 "CHANGE FOR YOUR ENV" 注释的属性

**6。 运行 应用 [使用 JDK1.8] :

./run_indexer.sh