使用 Flume 将 csv 文件加载到 HDFS(假脱机目录作为源)
Loading csv file into HDFS using Flume (spool directory as source)
我正在尝试使用 flume 和 spooldir 作为源和 HDFS 作为接收器将 csv 文件 (6MB) 加载到 HDFS,这是我的配置文件:
# Initialize agent's source, channel and sink
agent.sources = TwitterExampleDir
agent.channels = memoryChannel
agent.sinks = flumeHDFS
# Setting the source to spool directory where the file exists
agent.sources.TwitterExampleDir.type = spooldir
agent.sources.TwitterExampleDir.spoolDir = /usr/local/word_count_input
# Setting the channel to memory
agent.channels.memoryChannel.type = memory
# Max number of events stored in the memory channel
agent.channels.memoryChannel.capacity = 10000
# agent.channels.memoryChannel.batchSize = 15000
agent.channels.memoryChannel.transactioncapacity = 1000000
# Setting the sink to HDFS
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path = hdfs://192.168.220.128:8000/spool5
agent.sinks.flumeHDFS.hdfs.fileType = DataStream
# Write format can be text or writable
agent.sinks.flumeHDFS.hdfs.writeFormat = Text
# use a single csv file at a time
agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1
# rollover file based on maximum size of 10 MB
agent.sinks.flumeHDFS.hdfs.rollCount = 0
agent.sinks.flumeHDFS.hdfs.rollInterval = 0
agent.sinks.flumeHDFS.hdfs.rollSize = 1000
agent.sinks.flumeHDFS.hdfs.batchSize = 100
# never rollover based on the number of events
agent.sinks.flumeHDFS.hdfs.rollCount = 0
# rollover file based on max time of 1 min
#agent.sinks.flumeHDFS.hdfs.rollInterval = 0
# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600
# Connect source and sink with channel
agent.sources.TwitterExampleDir.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel
在那之后我得到了这些错误并且不知道为什么:
2015-02-05 09:01:01,036 [SinkRunner-PollingRunner-DefaultSinkProcessor] ERROR org.apache.flume.sink.hdfs.HDFSEventSink (HDFSEventSink.java:466) - process failed
java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:274)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:266)
at org.apache.flume.sink.hdfs.BucketWriter.run(BucketWriter.java:722)
at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:187)
at org.apache.flume.sink.hdfs.BucketWriter.access00(BucketWriter.java:59)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:719)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-02-05 09:01:01,062 [SinkRunner-PollingRunner-DefaultSinkProcessor] ERROR org.apache.flume.SinkRunner (SinkRunner.java:160) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:470)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:274)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:266)
at org.apache.flume.sink.hdfs.BucketWriter.run(BucketWriter.java:722)
at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:187)
at org.apache.flume.sink.hdfs.BucketWriter.access00(BucketWriter.java:59)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:719)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
谁能帮我解决这个问题?
我使用的是hortonworks sandbox v2.2,经过长时间的调试,我发现我手动安装的spark版本"v1.2"和hortonworks sandbox库有一些冲突,所以我决定使用cloudera quickstart 5.3 .0 现在一切正常
我正在尝试使用 flume 和 spooldir 作为源和 HDFS 作为接收器将 csv 文件 (6MB) 加载到 HDFS,这是我的配置文件:
# Initialize agent's source, channel and sink
agent.sources = TwitterExampleDir
agent.channels = memoryChannel
agent.sinks = flumeHDFS
# Setting the source to spool directory where the file exists
agent.sources.TwitterExampleDir.type = spooldir
agent.sources.TwitterExampleDir.spoolDir = /usr/local/word_count_input
# Setting the channel to memory
agent.channels.memoryChannel.type = memory
# Max number of events stored in the memory channel
agent.channels.memoryChannel.capacity = 10000
# agent.channels.memoryChannel.batchSize = 15000
agent.channels.memoryChannel.transactioncapacity = 1000000
# Setting the sink to HDFS
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path = hdfs://192.168.220.128:8000/spool5
agent.sinks.flumeHDFS.hdfs.fileType = DataStream
# Write format can be text or writable
agent.sinks.flumeHDFS.hdfs.writeFormat = Text
# use a single csv file at a time
agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1
# rollover file based on maximum size of 10 MB
agent.sinks.flumeHDFS.hdfs.rollCount = 0
agent.sinks.flumeHDFS.hdfs.rollInterval = 0
agent.sinks.flumeHDFS.hdfs.rollSize = 1000
agent.sinks.flumeHDFS.hdfs.batchSize = 100
# never rollover based on the number of events
agent.sinks.flumeHDFS.hdfs.rollCount = 0
# rollover file based on max time of 1 min
#agent.sinks.flumeHDFS.hdfs.rollInterval = 0
# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600
# Connect source and sink with channel
agent.sources.TwitterExampleDir.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel
在那之后我得到了这些错误并且不知道为什么:
2015-02-05 09:01:01,036 [SinkRunner-PollingRunner-DefaultSinkProcessor] ERROR org.apache.flume.sink.hdfs.HDFSEventSink (HDFSEventSink.java:466) - process failed
java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:274)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:266)
at org.apache.flume.sink.hdfs.BucketWriter.run(BucketWriter.java:722)
at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:187)
at org.apache.flume.sink.hdfs.BucketWriter.access00(BucketWriter.java:59)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:719)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-02-05 09:01:01,062 [SinkRunner-PollingRunner-DefaultSinkProcessor] ERROR org.apache.flume.SinkRunner (SinkRunner.java:160) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:470)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:274)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:266)
at org.apache.flume.sink.hdfs.BucketWriter.run(BucketWriter.java:722)
at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:187)
at org.apache.flume.sink.hdfs.BucketWriter.access00(BucketWriter.java:59)
at org.apache.flume.sink.hdfs.BucketWriter.call(BucketWriter.java:719)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
谁能帮我解决这个问题?
我使用的是hortonworks sandbox v2.2,经过长时间的调试,我发现我手动安装的spark版本"v1.2"和hortonworks sandbox库有一些冲突,所以我决定使用cloudera quickstart 5.3 .0 现在一切正常