Flume HDFS sink只存储一行数据source使用netcat source
Flume HDFS sink only stores one line of data source using netcat source
我尝试使用 Flume 1.7 将数据加载到 HDFS。我创建了以下配置:
# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf
# Naming the components on the current agent
Agent.sources = Netcat
Agent.channels = MemChannel
Agent.sinks = LoggerSink hdfs-sink LocalOut
# Describing/Configuring the source
Agent.sources.Netcat.type = netcat
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565
# Describing/Configuring the sink
Agent.sinks.LoggerSink.type = logger
# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 0
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0
# Schreibt input into local Filesystem
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
Agent.sinks.LocalOut.type = file_roll
Agent.sinks.LocalOut.sink.directory = /tmp/flume
Agent.sinks.LocalOut.sink.rollInterval = 0
# Describing/Configuring the channel
Agent.channels.MemChannel.type = memory
Agent.channels.MemChannel.capacity = 1000
Agent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.LoggerSink.channel = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
Agent.sinks.LocalOut.channel = MemChannel
之后,我使用 netcat 将以下文件发送到源:
cat textfile.csv | nc <IP of flume agent> 56565
文件包含以下元素:
Name1,1
Name2,2
Name3,3
Name4,4
Name5,5
Name6,6
Name7,7
Name8,8
Name9,9
Name10,10
Name11,11
Name12,12
Name13,13
Name14,14
Name15,15
Name16,16
Name17,17
Name18,18
Name19,19
Name20,20
...
Name490,490
Name491,491
Name492,492
我面临的问题是 flume 正在写入 hdfs 但没有任何错误,但只有传输文件的一行。
如果您开始使用 nectat 将文件多次推送到源,那么有时 flume 会向 hdfs 写入多个文件,包括不止一行。但很少所有行。
我尝试更改 rollSize、批量大小和其他的 hdfs 参数,但它并没有真正改变行为。
同样配置的汇入本地文件工作正常。
有人知道如何配置它以确保所有条目都写入 hdfs 而不会丢失条目吗?
感谢您的帮助。
更新 1.12.2016
除了HDFS的sink,我把所有的sink都去掉了,改了一些参数。在此之后,HDFS 接收器将按应有的方式执行。
配置如下:
# Naming the components on the current agent
Agent.sources = Netcat
Agent.channels = MemChannel
Agent.sinks = hdfs-sink
# Describing/Configuring the source
Agent.sources.Netcat.type = netcat
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565
# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 100
# Describing/Configuring the channel
Agent.channels.MemChannel.type = memory
Agent.channels.MemChannel.capacity = 1000
Agent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
有人知道为什么它可以使用此配置,但如果有两个或更多接收器,它就不再工作了吗?
我自己找到了解决方案。据我了解,我对两个接收器使用了相同的通道。因此,速度更快的接收器会接管所有条目,并且只有部分条目会传递到 hdfs 接收器。
在使用不同的频道并使用参数
对源进行扇形展开之后
Agent.sources.Netcat.selector.type = replicating
Flume 按预期写入本地文件和 hdfs。
我尝试使用 Flume 1.7 将数据加载到 HDFS。我创建了以下配置:
# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf
# Naming the components on the current agent
Agent.sources = Netcat
Agent.channels = MemChannel
Agent.sinks = LoggerSink hdfs-sink LocalOut
# Describing/Configuring the source
Agent.sources.Netcat.type = netcat
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565
# Describing/Configuring the sink
Agent.sinks.LoggerSink.type = logger
# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 0
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0
# Schreibt input into local Filesystem
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
Agent.sinks.LocalOut.type = file_roll
Agent.sinks.LocalOut.sink.directory = /tmp/flume
Agent.sinks.LocalOut.sink.rollInterval = 0
# Describing/Configuring the channel
Agent.channels.MemChannel.type = memory
Agent.channels.MemChannel.capacity = 1000
Agent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.LoggerSink.channel = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
Agent.sinks.LocalOut.channel = MemChannel
之后,我使用 netcat 将以下文件发送到源:
cat textfile.csv | nc <IP of flume agent> 56565
文件包含以下元素:
Name1,1
Name2,2
Name3,3
Name4,4
Name5,5
Name6,6
Name7,7
Name8,8
Name9,9
Name10,10
Name11,11
Name12,12
Name13,13
Name14,14
Name15,15
Name16,16
Name17,17
Name18,18
Name19,19
Name20,20
...
Name490,490
Name491,491
Name492,492
我面临的问题是 flume 正在写入 hdfs 但没有任何错误,但只有传输文件的一行。 如果您开始使用 nectat 将文件多次推送到源,那么有时 flume 会向 hdfs 写入多个文件,包括不止一行。但很少所有行。
我尝试更改 rollSize、批量大小和其他的 hdfs 参数,但它并没有真正改变行为。
同样配置的汇入本地文件工作正常。
有人知道如何配置它以确保所有条目都写入 hdfs 而不会丢失条目吗?
感谢您的帮助。
更新 1.12.2016
除了HDFS的sink,我把所有的sink都去掉了,改了一些参数。在此之后,HDFS 接收器将按应有的方式执行。
配置如下:
# Naming the components on the current agent
Agent.sources = Netcat
Agent.channels = MemChannel
Agent.sinks = hdfs-sink
# Describing/Configuring the source
Agent.sources.Netcat.type = netcat
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565
# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 100
# Describing/Configuring the channel
Agent.channels.MemChannel.type = memory
Agent.channels.MemChannel.capacity = 1000
Agent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
有人知道为什么它可以使用此配置,但如果有两个或更多接收器,它就不再工作了吗?
我自己找到了解决方案。据我了解,我对两个接收器使用了相同的通道。因此,速度更快的接收器会接管所有条目,并且只有部分条目会传递到 hdfs 接收器。
在使用不同的频道并使用参数
对源进行扇形展开之后Agent.sources.Netcat.selector.type = replicating
Flume 按预期写入本地文件和 hdfs。