Flume Kafka Source 不使用 file_roll 编写事件

Flume with Kafka Source not writing events using file_roll

所以 header 声明我有一个带有 kafka 源的 flume 代理,它写入 HDFS 位置,压缩为 avro,我想多路复用它以将事件写入日志文件也是如此。 我是 运行 我的 flume 在 AKS 内的吊舱中。

这就是我到目前为止所尝试的,我的 flume 配置的这一部分:

flumeagent.sources = kafkaSource
flumeagent.sources.kafkaSource.channels = kafkaChannel logChannel
flumeagent.sources.kafkaSource.selector.type = multiplexing
flumeagent.sources.kafkaSource.selector.default = kafkaChannel logChannel
flumeagent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
flumeagent.sources.kafkaSource.kafka.consumer.security.protocol = SSL
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.sources.kafkaSource.batchSize  = 5000
flumeagent.sources.kafkaSource.topics  = <TOPICS>
flumeagent.sources.kafkaSource.consumer.group.id  = <GROUP_ID>

flumeagent.channels = kafkaChannel logChannel
flumeagent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
flumeagent.channels.kafkaChannel.kafka.producer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.keep-alive = 120
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.heartbeat.interval.ms = 10000
flumeagent.channels.kafkaChannel.kafka.producer.buffer.memory = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.request.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.max.partition.fetch.bytes = 50000000
flumeagent.channels.kafkaChannel.transactionCapacity = 1000
flumeagent.channels.kafkaChannel.capacity = 50000
flumeagent.channels.kafkaChannel.kafka.topic = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.consumer.group.id = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.bootstrap.servers = <SERVERS>

flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10

flumeagent.sinks = hdfsSink logSink
flumeagent.sinks.hdfsSink.channel = kafkaChannel
flumeagent.sinks.hdfsSink.type = hdfs
flumeagent.sinks.hdfsSink.hdfs.fileType = DataStream
flumeagent.sinks.hdfsSink.serializer = avro_event
flumeagent.sinks.hdfsSink.serializer.compressionCodec = snappy
flumeagent.sinks.hdfsSink.hdfs.fileSuffix = .avro
flumeagent.sinks.hdfsSink.hdfs.batchSize = 10
flumeagent.sinks.hdfsSink.hdfs.rollSize = 0
flumeagent.sinks.hdfsSink.hdfs.rollCount = 0
flumeagent.sinks.hdfsSink.hdfs.callTimeout = 60000 
flumeagent.sinks.hdfsSink.hdfs.cleanPreviousTemps = true
flumeagent.sinks.hdfsSink.hdfs.inUsePrefix = .
flumeagent.sinks.hdfsSink.hdfs.rollInterval = 3600
flumeagent.sinks.hdfsSink.hdfs.maxPathCountToScan = 2
flumeagent.sinks.hdfsSink.hdfs.timeZone = <TIME_ZONE>
flumeagent.sinks.hdfsSink.hdfs.path =<HDFS PATH>

flumeagent.sinks.logsink.channel = logChannel
flumeagent.sinks.logsink.type = file_roll
flumeagent.sinks.logsink.sink.batchSize = 1 
flumeagent.sinks.logsink.sink.directory = /var/log
flumeagent.sinks.logsink.sink.rollInterval = 0
flumeagent.sinks.logsink.sink.pathManager.prefix = monitor
flumeagent.sinks.logsink.sink.pathManager.extension = txt

为了将其部署到 AKS,我使用

创建了一个配置映射
kubectl create configmap <name>.properties --from-file = flume.conf = <agent-name>.conf

申请我用

kubectl apply -f flume.yml

我得到的是,flume 成功写入 HDFS 位置,file_roll 接收器在 /var/log 中创建文件,但它没有向文件写入任何数据。

有效的方法是用 jdbc 通道更改内存通道。

替换这个

flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10

flumeagent.channels.logChannel.type = jdbc