flume1.6 spooldir 源仅摄取文件的一部分
flume1.6 spooldir source ingests only part of file
感谢阅读本文
我正在尝试使用 flume-1.6 收集日志。但我发现并不是所有的
日志文件是使用假脱机控制器源获取的。请给
你的建议!!
在一次测试中,我在日志文件 (170M) 中有 369,189 行,但在另一端我只收到 169,335 行。当我检查 flume.log 时,它说已到达文件末尾并将 .COMPLETED 添加到原始日志文件。
我尝试了不同的日志文件,大约有 300,000 行,在另一端收到了 52,410 条记录。
这是背景和配置:
每个日志文件的大小约为 200M。
flume配置了spooldir source,file channel,kafka sink,如下:
#agent definition
log_agent.sources = spooldirSrc
log_agent.channels = fileChannel
log_agent.sinks = kafkaSink
log_agent.sources.spooldirSrc.channels = fileChannel
log_agent.sinks.kafkaSink.channel = fileChannel
# source define
log_agent.sources.spooldirSrc.type = spooldir
log_agent.sources.spooldirSrc.spoolDir=/log_path/
# kafkaSink definition
log_agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
log_agent.sinks.kafkaSink.topic=log-topic
log_agent.sinks.kafkaSink.brokerList=kafka-host-1:9092,kafka-host-1:9092,kafka-host-1:9092
log_agent.sinks.kafkaSink.requiredAcks=1
log_agent.sinks.kafkaSink.batchSize=100
# fileChannel definition
log_agent.channels.fileChannel.type=file
log_agent.channels.fileChannel.checkpointDir=/path/checkpoint/
log_agent.channels.fileChannel.dataDirs=/path/data
log_agent.channels.fileChannel.capacity=100000
我读过flume document,spooldir 源默认使用Line Deserializer。
我下载了 flume-1.6 源代码,并将文件读取末尾的打印行添加到负责从日志中读取的 ReliableSpoolingFileEventReader。
reader 似乎在到达 EOF 之前就提前结束了。
如有任何建议,我们将不胜感激!
终于找到原因了。 LineDeserializer 依赖 ResettableFileInputStream 来读取文件。可重置文件输入流
读取多代码点 unicode 字符时出现问题。问题在flume1.7中解决。
我用 flume1.7 中的 source code 替换了 flume1.6 ResettableFileInputStream ,重新编译并替换了 flume-ng-core-1.6.0.jar in flume1.6/lib.
然后spooldir源可以消费文件的所有内容。
希望这可以帮助遇到同样 bizzar 问题的人。
感谢阅读本文
我正在尝试使用 flume-1.6 收集日志。但我发现并不是所有的 日志文件是使用假脱机控制器源获取的。请给 你的建议!!
在一次测试中,我在日志文件 (170M) 中有 369,189 行,但在另一端我只收到 169,335 行。当我检查 flume.log 时,它说已到达文件末尾并将 .COMPLETED 添加到原始日志文件。
我尝试了不同的日志文件,大约有 300,000 行,在另一端收到了 52,410 条记录。
这是背景和配置:
每个日志文件的大小约为 200M。
flume配置了spooldir source,file channel,kafka sink,如下:
#agent definition log_agent.sources = spooldirSrc log_agent.channels = fileChannel log_agent.sinks = kafkaSink log_agent.sources.spooldirSrc.channels = fileChannel log_agent.sinks.kafkaSink.channel = fileChannel # source define log_agent.sources.spooldirSrc.type = spooldir log_agent.sources.spooldirSrc.spoolDir=/log_path/ # kafkaSink definition log_agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink log_agent.sinks.kafkaSink.topic=log-topic log_agent.sinks.kafkaSink.brokerList=kafka-host-1:9092,kafka-host-1:9092,kafka-host-1:9092 log_agent.sinks.kafkaSink.requiredAcks=1 log_agent.sinks.kafkaSink.batchSize=100 # fileChannel definition log_agent.channels.fileChannel.type=file log_agent.channels.fileChannel.checkpointDir=/path/checkpoint/ log_agent.channels.fileChannel.dataDirs=/path/data log_agent.channels.fileChannel.capacity=100000
我读过flume document,spooldir 源默认使用Line Deserializer。 我下载了 flume-1.6 源代码,并将文件读取末尾的打印行添加到负责从日志中读取的 ReliableSpoolingFileEventReader。 reader 似乎在到达 EOF 之前就提前结束了。
如有任何建议,我们将不胜感激!
终于找到原因了。 LineDeserializer 依赖 ResettableFileInputStream 来读取文件。可重置文件输入流 读取多代码点 unicode 字符时出现问题。问题在flume1.7中解决。
我用 flume1.7 中的 source code 替换了 flume1.6 ResettableFileInputStream ,重新编译并替换了 flume-ng-core-1.6.0.jar in flume1.6/lib. 然后spooldir源可以消费文件的所有内容。
希望这可以帮助遇到同样 bizzar 问题的人。