在 flume 中将 csv 文件转换为 JSON
Converting csv file to JSON in flume
我正在尝试将 csv 文件从 flume 传递到 kafka。我可以使用以下配置文件直接传递文件,将整个文件从 flume 传递到 Kafka。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /User/Desktop/logFile.csv
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = kafkaTopic
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.sink1.batchSize = 20
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
但我希望在传递给kafka进行进一步处理之前将其转换为JSON格式。有人可以告诉我如何将文件从 csv 转换为 JSON 格式。
谢谢!!
我认为你需要编写自己的拦截器。
- 从实现拦截器接口开始
- 从 flume 事件正文中读取 CSV。
- 解析并组合 JSON
- 将其贴回活动正文
示例:https://questforthought.wordpress.com/2014/01/13/using-flume-interceptor-multiplexing/
我正在尝试将 csv 文件从 flume 传递到 kafka。我可以使用以下配置文件直接传递文件,将整个文件从 flume 传递到 Kafka。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /User/Desktop/logFile.csv
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = kafkaTopic
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.sink1.batchSize = 20
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
但我希望在传递给kafka进行进一步处理之前将其转换为JSON格式。有人可以告诉我如何将文件从 csv 转换为 JSON 格式。
谢谢!!
我认为你需要编写自己的拦截器。
- 从实现拦截器接口开始
- 从 flume 事件正文中读取 CSV。
- 解析并组合 JSON
- 将其贴回活动正文
示例:https://questforthought.wordpress.com/2014/01/13/using-flume-interceptor-multiplexing/