如何优雅地停止 flume 代理

How to stop a flume agent gracefully

许多网站建议在停止 flume 代理时使用 kill -9。

但是,当我使用 kill -9 停止代理时,HDFS 接收器文件将永远打开(如 *.tmp)。

如何优雅地停止 flume 代理,以便代理在停止之前关闭 HDFS 上的所有写入文件。

#Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

#Configure the Kafka Source
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 1000
agent.sources.r1.batchDurationMillis = 3000
agent.sources.r1.kafka.bootstrap.servers = <server1>:6667,<server2>:6667
agent.sources.r1.kafka.topics = 1-agent1-thread
agent.sources.r1.kafka.consumer.group.id = flume_agent_thread

#Describe the sink
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = /user/flume/kafka-data/1-agent1-thread/%y%m%d/%H
agent.sinks.k1.hdfs.filePrefix = 1-agent1-thread

#Describing sink with the problem of Encoding
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.writeFormat = Text

#Describing sink with the problem of many hdfs files
### Roll a file after certain amount of events occurs  ###
agent.sinks.k1.hdfs.rollInterval = 0
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollCount = 10000
agent.sinks.k1.hdfs.batchSize = 100
agent.sinks.k1.hdfs.idleTimeout = 300
agent.sinks.k1.hdfs.closeTries = 0
agent.sinks.k1.hdfs.retryInterval = 200

#Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000

#Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

使用 kill -TERM,这是终止所有 Hadoop-related 服务的标准方法。

Flume 将有一个 SIGTERM 处理程序 (See Application.java),它应该清理所有打开的文件。

ShutdownHook() -> stop() -> stopAllComponents()

kill -9 仅应在您已尝试 kill -TERM 且 flume 代理仍处于挂起状态时使用。