Flume Kafka Sink 的 TAILDIR 源 - 静态拦截器问题
Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue
我尝试做的场景如下:
1- Flume TAILDIR 源从日志文件中读取并将静态拦截器附加到消息的开头。拦截器由主机名和主机 IP 组成,因为我收到的每条日志消息都需要它。
2- Flume Kafka Producer Sink 从文件中获取这些消息并将它们放入 Kafka 主题中。
Flume配置如下:
tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1
tier1.sources.source1.interceptors=i1
tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###
tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1
tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy
所以现在我正在测试,我 运行 一个 Console Kafka Consumer 并且我开始在源文件中写入并且我确实收到了附加了 header 的消息。
示例:
我在源文件中写'test'回车然后保存文件
Flume 检测文件变化,然后将新行发送给 Kafka 生产者。
我的消费者得到以下行:
###HostName###000.00.0.000###test
现在的问题是,拦截器有时无法正常工作。就像 Flume 发送了 2 条消息,一条包含拦截器,另一条包含消息内容。
示例:
我在源文件中写'hi you'回车然后保存文件
Flume 检测文件变化,然后将新行发送给 Kafka 生产者。
我的消费者得到以下两行:
###HostName###000.00.0.000###
hi you
然后终端滚动到新消息内容。
当我在文本文件中键入 'hi you' 时,这种情况总是会发生,并且由于我是从日志文件中读取的,所以它何时发生是不可预测的。
帮助和支持将不胜感激^^
谢谢
所以问题出在 Kafka Consumer。它收到来自 flume
的完整消息
Interceptor + some garbage characters + message
如果其中一个垃圾字符是 \n(Linux 系统中的 LF),那么它会假定它有 2 条消息,而不是 1 条。
我在 Streamsets 中使用 Kafka Consumer 元素,因此更改消息定界符很简单。我做到了 \r\n,现在一切正常。
如果您将完整的消息作为字符串处理,并希望对其应用正则表达式或将其写入文件,那么最好将 \r 和 \n 替换为空字符串。
可以在此处找到完整的答案演练:
我尝试做的场景如下:
1- Flume TAILDIR 源从日志文件中读取并将静态拦截器附加到消息的开头。拦截器由主机名和主机 IP 组成,因为我收到的每条日志消息都需要它。
2- Flume Kafka Producer Sink 从文件中获取这些消息并将它们放入 Kafka 主题中。
Flume配置如下:
tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1
tier1.sources.source1.interceptors=i1
tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###
tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1
tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy
所以现在我正在测试,我 运行 一个 Console Kafka Consumer 并且我开始在源文件中写入并且我确实收到了附加了 header 的消息。
示例:
我在源文件中写'test'回车然后保存文件
Flume 检测文件变化,然后将新行发送给 Kafka 生产者。
我的消费者得到以下行:
###HostName###000.00.0.000###test
现在的问题是,拦截器有时无法正常工作。就像 Flume 发送了 2 条消息,一条包含拦截器,另一条包含消息内容。
示例:
我在源文件中写'hi you'回车然后保存文件
Flume 检测文件变化,然后将新行发送给 Kafka 生产者。
我的消费者得到以下两行:
###HostName###000.00.0.000###
hi you
然后终端滚动到新消息内容。
当我在文本文件中键入 'hi you' 时,这种情况总是会发生,并且由于我是从日志文件中读取的,所以它何时发生是不可预测的。
帮助和支持将不胜感激^^
谢谢
所以问题出在 Kafka Consumer。它收到来自 flume
的完整消息Interceptor + some garbage characters + message
如果其中一个垃圾字符是 \n(Linux 系统中的 LF),那么它会假定它有 2 条消息,而不是 1 条。
我在 Streamsets 中使用 Kafka Consumer 元素,因此更改消息定界符很简单。我做到了 \r\n,现在一切正常。
如果您将完整的消息作为字符串处理,并希望对其应用正则表达式或将其写入文件,那么最好将 \r 和 \n 替换为空字符串。
可以在此处找到完整的答案演练: