如何将 Kafka 读取批处理到 Elasticsearch
How can I batch Kafka reads to Elasticsearch
我对 Kafka 不太熟悉,但我想知道什么是最好的方法
从 Kafka 批量读取数据,这样我就可以使用 Elasticsearch Bulk Api 更快更可靠地加载数据。
顺便说一句,我正在为我的 Kafka 消费者使用 Vertx
谢谢,
我不知道这是否是最好的方法,但是当我开始寻找类似的功能时,我找不到任何现成的框架。我找到了这个项目:
https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer/tree/branch2.0
并开始为它做贡献,因为它没有做我想做的一切,而且也不容易扩展。现在 2.0 版本非常可靠,我们在公司的生产中使用它 processing/indexing 每天 3 亿个事件。
这不是自我推销:) - 只是分享我们如何做同类工作。当然,现在可能还有其他选择。
我使用了 spark streaming,它是使用 Scala 的一个非常简单的实现。
https://github.com/confluentinc/kafka-connect-elasticsearch
或者你可以试试这个来源
https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer
运行 作为标准 Jar
**1。将代码下载到 $INDEXER_HOME 目录中。
**2。 cp $INDEXER_HOME/src/main/resources/kafka-es-indexer.properties.template /your/absolute/path/kafka-es-indexer.properties 文件 - 按照评论
中的说明更新所有相关属性
**3。 cp $INDEXER_HOME/src/main/resources/logback.xml.template /your/absolute/path/logback.xml
指定要存储日志的目录:
根据需要调整最大大小和日志文件数量的值
**4。 build/create 应用程序 jar(确保安装了 MAven):
cd $INDEXER_HOME
mvn clean package
kafka-es-indexer-2.0.jar 将在 $INDEXER_HOME/bin 中创建。所有依赖项都将放在 $INDEXER_HOME/bin/lib 中。所有 JAR 依赖项都通过 kafka-es-indexer-2.0.jar manifest.
链接
**5。编辑您的 $INDEXER_HOME/run_indexer.sh 脚本: -- 如果需要,使其可执行 (chmod a+x $INDEXER_HOME/run_indexer.sh) -- 根据您的环境更新标有 "CHANGE FOR YOUR ENV" 注释的属性
**6。 运行 应用 [使用 JDK1.8] :
./run_indexer.sh
我对 Kafka 不太熟悉,但我想知道什么是最好的方法 从 Kafka 批量读取数据,这样我就可以使用 Elasticsearch Bulk Api 更快更可靠地加载数据。
顺便说一句,我正在为我的 Kafka 消费者使用 Vertx
谢谢,
我不知道这是否是最好的方法,但是当我开始寻找类似的功能时,我找不到任何现成的框架。我找到了这个项目:
https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer/tree/branch2.0
并开始为它做贡献,因为它没有做我想做的一切,而且也不容易扩展。现在 2.0 版本非常可靠,我们在公司的生产中使用它 processing/indexing 每天 3 亿个事件。
这不是自我推销:) - 只是分享我们如何做同类工作。当然,现在可能还有其他选择。
我使用了 spark streaming,它是使用 Scala 的一个非常简单的实现。
https://github.com/confluentinc/kafka-connect-elasticsearch
或者你可以试试这个来源
https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer
运行 作为标准 Jar
**1。将代码下载到 $INDEXER_HOME 目录中。
**2。 cp $INDEXER_HOME/src/main/resources/kafka-es-indexer.properties.template /your/absolute/path/kafka-es-indexer.properties 文件 - 按照评论
中的说明更新所有相关属性**3。 cp $INDEXER_HOME/src/main/resources/logback.xml.template /your/absolute/path/logback.xml
指定要存储日志的目录:
根据需要调整最大大小和日志文件数量的值
**4。 build/create 应用程序 jar(确保安装了 MAven):
cd $INDEXER_HOME
mvn clean package
kafka-es-indexer-2.0.jar 将在 $INDEXER_HOME/bin 中创建。所有依赖项都将放在 $INDEXER_HOME/bin/lib 中。所有 JAR 依赖项都通过 kafka-es-indexer-2.0.jar manifest.
链接**5。编辑您的 $INDEXER_HOME/run_indexer.sh 脚本: -- 如果需要,使其可执行 (chmod a+x $INDEXER_HOME/run_indexer.sh) -- 根据您的环境更新标有 "CHANGE FOR YOUR ENV" 注释的属性
**6。 运行 应用 [使用 JDK1.8] :
./run_indexer.sh