flume 使用 python 套接字或 telnet 时无法正确获取事件

can't get events correctly while flume working with python socket or telnet

我想使用 Flume 在 Python 脚本中收集日志,所以我按照用户指南使用 netcat 源配置 Flume,然后我使用 telnet 和 nc 进行测试,效果很好。

我的配置代码:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

然后我用Python连接Flume,然后像这样发送一些话给它:

import socket
def netcat(hostname, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((hostname, port))
    s.send("test words 1\n")
    s.send("test words 2\n")
    s.send("test words 3\n")
    s.send("test words 4\n")
    s.shutdown(socket.SHUT_WR)
    s.close()

if _name_ == "_main_":
    netcat("127.0.0.1",44444)

出现问题,flume 只能接收 2 行。 flume 日志:

2016-12-28 16:44:32,248 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 2016-12-28 16:44:41,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31 test words 1 } 2016-12-28 16:44:41,815 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32 test words 2 }

我在 Ubuntu&Java1.8 和 Centos&Java 1.7 上得到了相同的结果,在 Python.[=16= 中使用 telnet 模型得到了相同的结果]

配置或Python脚本有什么问题吗?或者有人对这种情况有什么建议吗?

发生这种情况的原因是您没有等待响应返回。默认情况下,Flume 的 netcat 源将为每个事件发回 "OK" 消息。您在发送响应之前终止了连接,这导致进一步消息的处理失败(因为管道已从客户端断开)。

要解决此问题,您需要对 flume.conf 进行以下更改:

a1.sources.r1.ack-every-event=false

这消除了发送 "OK" 的要求,因此停止了失败。

或者,您可以更改 Python 以在每次关闭连接之前等待 "OK" 消息发送。人为地,在 should 中添加睡眠语句也可以解决问题,尽管您会假设处理消息可能需要多长时间。正常情况下可以,但可能有其他情况导致处理延迟。