如何在 Flume 1.7 中编写自定义 ES 接收器
How to write a custom ES sink in Flume 1.7
在 Flume 代理中,我从 Kafka 主题中收集元素,我需要将它们插入 ES。但是我需要在接收器中执行先前的消化过程,所以我需要编写一个自定义接收器以将数据从代理的通道传递到 java 消化模块(我已经编写过)。
谁能给我一个定制水槽的模板,可以作为参考吗? Flumes官网对这个话题并没有多说:
启动 Flume 代理时,自定义接收器的 class 及其依赖项必须包含在代理的 class 路径中。自定义接收器的类型是其 FQCN。
https://flume.apache.org/FlumeUserGuide.html#custom-sink
一旦自定义接收器准备就绪,我如何link以下三个文件来使代理工作:
- 自定义水槽
- 摄取 jar(java 执行摄取过程的模块)
- FlumeAgent.properties
感谢您的任何反馈。一旦我在这个任务中取得进展,我会继续添加信息。
希望您正在尝试使用 Flume 从 Kafka(源)接收事件并将其转发到 ES(接收器),您已经拥有一些数据处理逻辑。
基于这种理解,我建议您在发送到 Sink 之前查看 Flume 拦截器,它负责 altering/filtering 正在运行的事件。
所以你所有改变事件的业务逻辑都可以作为自定义拦截器实现,它应该配置到 Flume 通道。
作为参考,您可以查看 native interceptors source code 已经可用的内容。这可能会让您对 Flume 拦截器框架有所了解。
示例Flume 配置
a1.sources = kafkaSource
a1.sinks = ES_Sink
a1.channels = channel1
a1.sources.kafkaSource.interceptors = i1
a1.sources.kafkaSource.interceptors.i1.type = org.apache.flume.interceptor.<Custom_Interceptor_name>$Builder
a1.sinks.ES_Sink.channel = channel1
a1.sinks.ES_Sink.type = elasticsearch
a1.sinks.ES_Sink.hostNames = 127.0.0.1:9200
在 Flume 代理中,我从 Kafka 主题中收集元素,我需要将它们插入 ES。但是我需要在接收器中执行先前的消化过程,所以我需要编写一个自定义接收器以将数据从代理的通道传递到 java 消化模块(我已经编写过)。
谁能给我一个定制水槽的模板,可以作为参考吗? Flumes官网对这个话题并没有多说: 启动 Flume 代理时,自定义接收器的 class 及其依赖项必须包含在代理的 class 路径中。自定义接收器的类型是其 FQCN。 https://flume.apache.org/FlumeUserGuide.html#custom-sink
一旦自定义接收器准备就绪,我如何link以下三个文件来使代理工作:
- 自定义水槽
- 摄取 jar(java 执行摄取过程的模块)
- FlumeAgent.properties
感谢您的任何反馈。一旦我在这个任务中取得进展,我会继续添加信息。
希望您正在尝试使用 Flume 从 Kafka(源)接收事件并将其转发到 ES(接收器),您已经拥有一些数据处理逻辑。
基于这种理解,我建议您在发送到 Sink 之前查看 Flume 拦截器,它负责 altering/filtering 正在运行的事件。
所以你所有改变事件的业务逻辑都可以作为自定义拦截器实现,它应该配置到 Flume 通道。
作为参考,您可以查看 native interceptors source code 已经可用的内容。这可能会让您对 Flume 拦截器框架有所了解。
示例Flume 配置
a1.sources = kafkaSource
a1.sinks = ES_Sink
a1.channels = channel1
a1.sources.kafkaSource.interceptors = i1
a1.sources.kafkaSource.interceptors.i1.type = org.apache.flume.interceptor.<Custom_Interceptor_name>$Builder
a1.sinks.ES_Sink.channel = channel1
a1.sinks.ES_Sink.type = elasticsearch
a1.sinks.ES_Sink.hostNames = 127.0.0.1:9200