Flume 超过内存事务容量后不恢复
Flume doesn't recover after memory transaction capacity is exceeded
我正在创建 Flume 代理的概念验证,它将缓冲事件并在接收器不可用时停止使用来自源的事件。只有当 sink 再次可用时,才应该处理缓冲的事件,然后 source 重新开始消费。
为此,我创建了一个简单的代理,它从 SpoolDir 读取并写入文件。为了模拟接收器服务已关闭,我更改了文件权限,因此 Flume 无法写入。然后我开始 Flume 一些事件被缓冲在内存通道中,并且当通道容量已满时它停止消耗事件,正如预期的那样。一旦文件变得可写,接收器就能够处理事件并且 Flume 恢复。但是,这仅在未超过交易容量时才有效。一旦超过交易容量,Flume就再也回不去了,一直写着下面的错误:
2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to process transaction
at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction,
capacity 4 full, consider committing more frequently, increasing capacity, or
increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
... 3 more
一旦内存中缓冲的事件数超过事务容量 (4),就会发生此错误。不明白为什么,因为fileout的batchSize是1,所以应该把events一个个取出来。
这是我正在使用的配置:
agent.sources = spool-src
agent.channels = mem-channel
agent.sinks = fileout
agent.sources.spool-src.channels = mem-channel
agent.sources.spool-src.type = spooldir
agent.sources.spool-src.spoolDir = /tmp/flume-spool
agent.sources.spool-src.batchSize = 1
agent.channels.mem-channel.type = memory
agent.channels.mem-channel.capacity = 10
agent.channels.mem-channel.transactionCapacity = 4
agent.sinks.fileout.channel = mem-channel
agent.sinks.fileout.type = file_roll
agent.sinks.fileout.sink.directory = /tmp/flume-output
agent.sinks.fileout.sink.rollInterval = 0
agent.sinks.fileout.batchSize = 1
我已经用不同的通道容量和交易容量值(例如 3 和 3)测试了此配置,但没有发现通道容量已满并且 Flume 能够恢复。
在 flume 邮件列表中有人告诉我可能 this bug 影响了我的概念验证。该错误导致批处理大小为 100,即使它在配置中指定的不同。我重新 运行 测试,将源和接收器 batchSizes 设置为 100,将内存通道 t运行sactionCapacity 设置为 100,将其容量设置为 300。使用这些值,概念证明的工作原理与预期。
我正在创建 Flume 代理的概念验证,它将缓冲事件并在接收器不可用时停止使用来自源的事件。只有当 sink 再次可用时,才应该处理缓冲的事件,然后 source 重新开始消费。
为此,我创建了一个简单的代理,它从 SpoolDir 读取并写入文件。为了模拟接收器服务已关闭,我更改了文件权限,因此 Flume 无法写入。然后我开始 Flume 一些事件被缓冲在内存通道中,并且当通道容量已满时它停止消耗事件,正如预期的那样。一旦文件变得可写,接收器就能够处理事件并且 Flume 恢复。但是,这仅在未超过交易容量时才有效。一旦超过交易容量,Flume就再也回不去了,一直写着下面的错误:
2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to process transaction
at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction,
capacity 4 full, consider committing more frequently, increasing capacity, or
increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
... 3 more
一旦内存中缓冲的事件数超过事务容量 (4),就会发生此错误。不明白为什么,因为fileout的batchSize是1,所以应该把events一个个取出来。
这是我正在使用的配置:
agent.sources = spool-src
agent.channels = mem-channel
agent.sinks = fileout
agent.sources.spool-src.channels = mem-channel
agent.sources.spool-src.type = spooldir
agent.sources.spool-src.spoolDir = /tmp/flume-spool
agent.sources.spool-src.batchSize = 1
agent.channels.mem-channel.type = memory
agent.channels.mem-channel.capacity = 10
agent.channels.mem-channel.transactionCapacity = 4
agent.sinks.fileout.channel = mem-channel
agent.sinks.fileout.type = file_roll
agent.sinks.fileout.sink.directory = /tmp/flume-output
agent.sinks.fileout.sink.rollInterval = 0
agent.sinks.fileout.batchSize = 1
我已经用不同的通道容量和交易容量值(例如 3 和 3)测试了此配置,但没有发现通道容量已满并且 Flume 能够恢复。
在 flume 邮件列表中有人告诉我可能 this bug 影响了我的概念验证。该错误导致批处理大小为 100,即使它在配置中指定的不同。我重新 运行 测试,将源和接收器 batchSizes 设置为 100,将内存通道 t运行sactionCapacity 设置为 100,将其容量设置为 300。使用这些值,概念证明的工作原理与预期。