编写一个 Flume 配置,将不断增长的文件上传到 HDFS
Write a Flume configuration to upload the ever growing file to HDFS
我是 Flume 的新手,在配置方面遇到了一些问题。
我在 Oracle VirtualBox 上使用 Hortonworks Sandbox HDP 2.6.5(如果这很重要)。
我的 VM 中有一个文本文件 input_data.txt:
input_data.txt 的内容如下所示:
我使用以下命令来创建并逐渐增加输入:
cat input_data.txt | while read line ; do echo "$line" ; sleep 0.2 ; done > output.txt
我要实现的目标:
1)写一个Flume配置上传不断增长的output.txt文件到HDFS
2) 如果可能 - 每次源文件 (/usr/AUX/output.txt) 更改时,HDFS 中的目标文件都必须更新。
例如:
我打开 /usr/AUX/output.txt ,最后写几个字符串保存:
Nov 16 10:21:22 ephubudw3000 avahi-daemon[990]: Invalid response packet from host 10.0.9.31.
Nov 16 10:21:22 ephubudw3000 avahi-daemon[990]: Invalid response packet from host 10.0.12.143.
...
lolkek
hehehehe
azazaza322
然后这个新数据必须出现在 hdfs://sandbox.hortonworks.com:8020/user/tutorial/
中的 HDFS 目标文件中
我已经试过了:
我创建了这个配置(flume.conf 文件):
a1.sources = src1
a1.sinks = sink1
a1.channels = memoryChannel
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command = cat /usr/AUX/output.txt
a1.sources.src1.batchSize = 1
a1.sources.src1.channels = memoryChannel
a1.channels.memoryChannel.type = memory
a1.channels.memoryChannel.capacity = 1000
a1.channels.memoryChannel.transactionCapacity = 100
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.channels = memoryChannel
a1.sinks.sink1.hdfs.filetype = DataStream
a1.sinks.sink1.hdfs.writeFormat = Text
a1.sinks.sink1.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/tutorial/
然后我使用以下命令启动 Flume 代理 (a1):
/usr/hdp/current/flume-server/bin/flume-ng agent -c /etc/flume/conf -f /etc/flume/conf/flume.conf -n a1
[root@sandbox-hdp AUX]# /usr/hdp/current/flume-server/bin/flume-ng agent -c /etc/flume/conf -f /etc/flume/conf/flume.conf -n a1
Warning: JAVA_HOME is not set!
Info: Including Hadoop libraries found via (/bin/hadoop) for HDFS access
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-api-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-log4j12-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/tez/lib/slf4j-api-1.7.5.jar from classpath
Info: Including HBASE libraries found via (/bin/hbase) for HBASE access
Info: Excluding /usr/hdp/2.6.5.0-292/hbase/lib/slf4j-api-1.7.7.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-api-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-log4j12-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/tez/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-api-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-log4j12-1.7.10.jar from classpath
Info: Including Hive libraries found via () for Hive access
+ exec /usr/bin/java -Xmx20m -cp '/etc/flume/conf:/usr/hdp/2.6.5.0-292/flume/lib/*:/usr/hdp/2.6.5.0-292/hadoop/conf:/usr/hdp/2.6.5.0-292/hadoop/lib/activation-1.1.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/apacheds-i18n-2.0.0-M15.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/api-asn1-api-1.0.0-M20.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/api-util-1.0.0-M20.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/asm-3.2.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/avro-1.7.4.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/aws-java-sdk-core-1.10.6.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/aws-java-sdk-kms-1.10.6.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/aws-java-sdk-s3-1.10.6.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/azure-keyvault-core-0.8.0.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/azure-storage-5.4.0.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/commons-beanutils-1.7.0.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/commons-beanutils-core-1.8.0.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/commons-cli-1.2.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/commons-codec-1.4.jar:
...........................
2/hadoop/lib/native/Linux-amd64-64:/usr/hdp/2.6.5.0-292/hadoop/lib/native::/usr/hdp/2.6.5.0-292/hadoop/lib/native/Linux-amd64-64:/usr/hdp/2.6.5.0-292/hadoop/lib/native org.apache.flume.node.Application -f /etc/flume/conf/flume.conf -n a1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.5.0-292/flume/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.5.0-292/flume/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
在此之后我在 /usr/AUX/output.txt 的末尾添加了几个字符串...
什么也没发生。 HDFS 中没有更新的文件
我将不胜感激。
是否可以实现我提到的目标(更新VM中的文件后自动更新HDFS中的文件),我的Flume配置有什么问题?
谢谢!
If it is possible - destination file in HDFS must be updated every time when the source file (/usr/AUX/output.txt) changes
嗯,问题是 HDFS 文件并不意味着 "updated",因为 HDFS 是针对追加优化的文件系统。因此,推荐的模式是创建一个新文件。而且几乎所有的 Hadoop 处理引擎都可以读取整个目录,所以这应该不是问题。
就 Flume 而言,您应该使用 Spooling Directory 源,而不是 cat
或 tail -f
的 Exec 源。否则,Flume 代理不是为读取 "file updates" 而设计的,只能读取 "newly seen" 文件。然后将其标记为完成,然后 moves/ignores 它们稍后。
因此,您会想要这样的东西,它会在您的进程每次运行时生成一个 带时间戳 的文件。这足以让 Flume 说文件是新的,应该是 read/processed。
some_process >> /flume_watcher/output_$(date +%s%3N).txt
参见 Spooling Directory, and why Exec Source is discouraged(红框)。
补充说明:HDP has deprecated Flume,推荐使用 Hortonworks DataFlow (Apache Nifi)。 IE。在 HDP 3.0 沙盒中(如果有的话),你不会有 Flume。因此,不要在上面浪费太多时间。
尝试使用您的原始配置文件,并在您的 conf 文件中进行以下修改:
a1.sinks.sink1.channel = memoryChannel
请注意,您有一个额外的 's',因为根据 Flume documentation,正确的 属性 只是频道。我认为它应该与 hdfs Sink 一起使用 exec Source。
您可能还想修复警告消息:JAVA_HOME 未设置。
我是 Flume 的新手,在配置方面遇到了一些问题。
我在 Oracle VirtualBox 上使用 Hortonworks Sandbox HDP 2.6.5(如果这很重要)。
我的 VM 中有一个文本文件 input_data.txt:
input_data.txt 的内容如下所示:
我使用以下命令来创建并逐渐增加输入:
cat input_data.txt | while read line ; do echo "$line" ; sleep 0.2 ; done > output.txt
我要实现的目标:
1)写一个Flume配置上传不断增长的output.txt文件到HDFS
2) 如果可能 - 每次源文件 (/usr/AUX/output.txt) 更改时,HDFS 中的目标文件都必须更新。
例如: 我打开 /usr/AUX/output.txt ,最后写几个字符串保存:
Nov 16 10:21:22 ephubudw3000 avahi-daemon[990]: Invalid response packet from host 10.0.9.31.
Nov 16 10:21:22 ephubudw3000 avahi-daemon[990]: Invalid response packet from host 10.0.12.143.
...
lolkek
hehehehe
azazaza322
然后这个新数据必须出现在 hdfs://sandbox.hortonworks.com:8020/user/tutorial/
中的 HDFS 目标文件中我已经试过了:
我创建了这个配置(flume.conf 文件):
a1.sources = src1
a1.sinks = sink1
a1.channels = memoryChannel
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command = cat /usr/AUX/output.txt
a1.sources.src1.batchSize = 1
a1.sources.src1.channels = memoryChannel
a1.channels.memoryChannel.type = memory
a1.channels.memoryChannel.capacity = 1000
a1.channels.memoryChannel.transactionCapacity = 100
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.channels = memoryChannel
a1.sinks.sink1.hdfs.filetype = DataStream
a1.sinks.sink1.hdfs.writeFormat = Text
a1.sinks.sink1.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/tutorial/
然后我使用以下命令启动 Flume 代理 (a1):
/usr/hdp/current/flume-server/bin/flume-ng agent -c /etc/flume/conf -f /etc/flume/conf/flume.conf -n a1
[root@sandbox-hdp AUX]# /usr/hdp/current/flume-server/bin/flume-ng agent -c /etc/flume/conf -f /etc/flume/conf/flume.conf -n a1
Warning: JAVA_HOME is not set!
Info: Including Hadoop libraries found via (/bin/hadoop) for HDFS access
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-api-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-log4j12-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/tez/lib/slf4j-api-1.7.5.jar from classpath
Info: Including HBASE libraries found via (/bin/hbase) for HBASE access
Info: Excluding /usr/hdp/2.6.5.0-292/hbase/lib/slf4j-api-1.7.7.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-api-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-log4j12-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/tez/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-api-1.7.10.jar from classpath
Info: Excluding /usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-log4j12-1.7.10.jar from classpath
Info: Including Hive libraries found via () for Hive access
+ exec /usr/bin/java -Xmx20m -cp '/etc/flume/conf:/usr/hdp/2.6.5.0-292/flume/lib/*:/usr/hdp/2.6.5.0-292/hadoop/conf:/usr/hdp/2.6.5.0-292/hadoop/lib/activation-1.1.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/apacheds-i18n-2.0.0-M15.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/api-asn1-api-1.0.0-M20.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/api-util-1.0.0-M20.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/asm-3.2.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/avro-1.7.4.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/aws-java-sdk-core-1.10.6.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/aws-java-sdk-kms-1.10.6.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/aws-java-sdk-s3-1.10.6.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/azure-keyvault-core-0.8.0.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/azure-storage-5.4.0.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/commons-beanutils-1.7.0.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/commons-beanutils-core-1.8.0.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/commons-cli-1.2.jar:/usr/hdp/2.6.5.0-292/hadoop/lib/commons-codec-1.4.jar:
...........................
2/hadoop/lib/native/Linux-amd64-64:/usr/hdp/2.6.5.0-292/hadoop/lib/native::/usr/hdp/2.6.5.0-292/hadoop/lib/native/Linux-amd64-64:/usr/hdp/2.6.5.0-292/hadoop/lib/native org.apache.flume.node.Application -f /etc/flume/conf/flume.conf -n a1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.5.0-292/flume/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.5.0-292/flume/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
在此之后我在 /usr/AUX/output.txt 的末尾添加了几个字符串...
什么也没发生。 HDFS 中没有更新的文件
我将不胜感激。 是否可以实现我提到的目标(更新VM中的文件后自动更新HDFS中的文件),我的Flume配置有什么问题?
谢谢!
If it is possible - destination file in HDFS must be updated every time when the source file (/usr/AUX/output.txt) changes
嗯,问题是 HDFS 文件并不意味着 "updated",因为 HDFS 是针对追加优化的文件系统。因此,推荐的模式是创建一个新文件。而且几乎所有的 Hadoop 处理引擎都可以读取整个目录,所以这应该不是问题。
就 Flume 而言,您应该使用 Spooling Directory 源,而不是 cat
或 tail -f
的 Exec 源。否则,Flume 代理不是为读取 "file updates" 而设计的,只能读取 "newly seen" 文件。然后将其标记为完成,然后 moves/ignores 它们稍后。
因此,您会想要这样的东西,它会在您的进程每次运行时生成一个 带时间戳 的文件。这足以让 Flume 说文件是新的,应该是 read/processed。
some_process >> /flume_watcher/output_$(date +%s%3N).txt
参见 Spooling Directory, and why Exec Source is discouraged(红框)。
补充说明:HDP has deprecated Flume,推荐使用 Hortonworks DataFlow (Apache Nifi)。 IE。在 HDP 3.0 沙盒中(如果有的话),你不会有 Flume。因此,不要在上面浪费太多时间。
尝试使用您的原始配置文件,并在您的 conf 文件中进行以下修改:
a1.sinks.sink1.channel = memoryChannel
请注意,您有一个额外的 's',因为根据 Flume documentation,正确的 属性 只是频道。我认为它应该与 hdfs Sink 一起使用 exec Source。
您可能还想修复警告消息:JAVA_HOME 未设置。