阿帕奇 Flume。具有多路复用通道选择器的正则表达式提取器
Apache Flume. Regex extractor with multiplexing channel selector
我有以下 Flume 配置,用于获取具有特定数值的服务器日志条目并将它们推送到相应的
kafka 主题。
# Name the components on this agent
a1.sources = r1
a1.channels = c2 c3
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/user/spoolFlume
a1.sources.r1.fileSuffix = .DONE
a1.sources.r1.basenameHeader = true
a1.sources.r1.deserializer.maxLineLength = 8192
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (2725391)
a1.sources.r1.interceptors.i1.serializers = id
a1.sources.r1.interceptors.i1.serializers.id.name = project_id
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = project_id
a1.sources.r1.selector.mapping.2725391 = c3
a1.sources.r1.selector.default = c2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.brokerList=kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092
a1.channels.c2.topic = flume_test_002
a1.channels.c2.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181
#default = true
a1.channels.c2.parseAsFlumeEvent = true
a1.channels.c3.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c3.brokerList = kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092
a1.channels.c3.topic = flume_test_003
a1.channels.c3.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181
a1.channels.c3.parseAsFlumeEvent = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c2 c3
我用更复杂的正则表达式做了一些测试,在 cat | grep -E <regexp>
上看起来一切都很好,但是当我试图在 Flume 配置中使用它时,并不是所有的条目都被捕获。
现在我使用一个单词正则表达式,但即使如此,也并非所有条目都被捕获,即并非所有 'right' 条目都转到 kafka 主题(例如,我在日志中有 2 个带有“2725391”的字符串,但在处理我在kafka中只能看到一个条目)。
Flume 配置似乎有问题。任何建议将不胜感激。
更新 2。甚至更多 - 当我使用短文件(少于 100 个字符串)解析时一切正常。对于大约 2GB 的文件,我错过了条目。
更新 3. 我找到了解析所有条目的方法。
a1.sources.r1.decodeErrorPolicy = IGNORE
这很有帮助,因为在 Kafka 通道中解析事件的 header 中有一个奇怪的符号。我不知道它来自哪里,因为在处理之前原始日志中没有这样的符号:/
basename00278388pid2725391�31.28.244.74
解决方案是设置 JAVA_HOME 正确的值并设置以下设置:
a1.sources.r1.decodeErrorPolicy = IGNORE
问题的根源在于日志中某处的非 UTF 字符。
我有以下 Flume 配置,用于获取具有特定数值的服务器日志条目并将它们推送到相应的 kafka 主题。
# Name the components on this agent
a1.sources = r1
a1.channels = c2 c3
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/user/spoolFlume
a1.sources.r1.fileSuffix = .DONE
a1.sources.r1.basenameHeader = true
a1.sources.r1.deserializer.maxLineLength = 8192
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (2725391)
a1.sources.r1.interceptors.i1.serializers = id
a1.sources.r1.interceptors.i1.serializers.id.name = project_id
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = project_id
a1.sources.r1.selector.mapping.2725391 = c3
a1.sources.r1.selector.default = c2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.brokerList=kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092
a1.channels.c2.topic = flume_test_002
a1.channels.c2.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181
#default = true
a1.channels.c2.parseAsFlumeEvent = true
a1.channels.c3.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c3.brokerList = kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092
a1.channels.c3.topic = flume_test_003
a1.channels.c3.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181
a1.channels.c3.parseAsFlumeEvent = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c2 c3
我用更复杂的正则表达式做了一些测试,在 cat | grep -E <regexp>
上看起来一切都很好,但是当我试图在 Flume 配置中使用它时,并不是所有的条目都被捕获。
现在我使用一个单词正则表达式,但即使如此,也并非所有条目都被捕获,即并非所有 'right' 条目都转到 kafka 主题(例如,我在日志中有 2 个带有“2725391”的字符串,但在处理我在kafka中只能看到一个条目)。
Flume 配置似乎有问题。任何建议将不胜感激。
更新 2。甚至更多 - 当我使用短文件(少于 100 个字符串)解析时一切正常。对于大约 2GB 的文件,我错过了条目。
更新 3. 我找到了解析所有条目的方法。
a1.sources.r1.decodeErrorPolicy = IGNORE
这很有帮助,因为在 Kafka 通道中解析事件的 header 中有一个奇怪的符号。我不知道它来自哪里,因为在处理之前原始日志中没有这样的符号:/
basename00278388pid2725391�31.28.244.74
解决方案是设置 JAVA_HOME 正确的值并设置以下设置:
a1.sources.r1.decodeErrorPolicy = IGNORE
问题的根源在于日志中某处的非 UTF 字符。