运行 flume 具有自定义来源的代理
Running flume agent with custom source
我正在尝试使用来自 here 的自定义源配置 flume 代理,我尝试使用命令
运行 flume 代理
flume-ng agent --conf conf --conf-file conf/twitter1.conf --name
TwitterAgent
但是我无法启动我的 Flume-agent,它显示这样的错误消息,
命令提示符回复为
> Info: Sourcing environment configuration script
> /usr/local/lib/apache-flume-1.6.0-bin/conf/flume-env.sh Info:
> Including Hive libraries found via () for Hive access
> + exec /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms500m -Xmx1000m -Dcom.sun.management.jmxremote -cp '/usr/local/lib/apache-flume-1.6.0-bin/conf:/usr/local/lib/apache-flume-1.6.0-bin/lib/*:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/lib/twitterstream.jar:/usr/local/lib/apache-flume-1.6.0-bin/lib/flume-sources-1.0-SNAPSHOT.jar:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/lib/*:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/libext/*:/lib/*'
> -Djava.library.path= org.apache.flume.node.Application --conf-file conf/twitter1.conf --name TwitterAgent 2015-07-13 17:43:51,355
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)]
> Configuration provider starting 2015-07-13 17:43:51,361
> (conf-file-poller-0) [INFO -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)]
> Reloading configuration file:conf/twitter1.conf 2015-07-13
> 17:43:51,365 (conf-file-poller-0) [WARN -
> org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:102)]
> Configuration property ignored: = 2015-07-13 17:43:51,369
> (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:Twitsink 2015-07-13 17:43:51,369 (conf-file-poller-0) [INFO
> - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:Twitsink 2015-07-13 17:43:51,369 (conf-file-poller-0) [INFO
> - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)]
> Added sinks: Twitsink Agent: TwitterAgent 2015-07-13 17:43:51,379
> (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)]
> Post-validation flume configuration contains configuration for agents:
> [TwitterAgent] 2015-07-13 17:43:51,380 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)]
> Creating channels 2015-07-13 17:43:51,385 (conf-file-poller-0) [INFO -
> org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)]
> Creating instance of channel MemChannel type memory 2015-07-13
> 17:43:51,391 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)]
> Created channel MemChannel 2015-07-13 17:43:51,391
> (conf-file-poller-0) [INFO -
> org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)]
> Creating instance of source Twitter, type
> com.qb.twitter.TwitterStreamsource 2015-07-13 17:43:51,444
> (conf-file-poller-0) [INFO -
> org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)]
> Creating instance of sink: Twitsink, type: logger 2015-07-13
> 17:43:51,448 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)]
> Channel MemChannel connected to [Twitter, Twitsink] 2015-07-13
> 17:43:51,456 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:138)]
> Starting new configuration:{
> sourceRunners:{Twitter=EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:IDLE} }}
> sinkRunners:{Twitsink=SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@385d25c0
> counterGroup:{ name:null counters:{} } }}
> channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name:
> MemChannel}} } 2015-07-13 17:43:51,463 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:145)]
> Starting Channel MemChannel 2015-07-13 17:43:51,466
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]
> Monitored counter group for type: CHANNEL, name: MemChannel:
> Successfully registered new MBean. 2015-07-13 17:43:51,468
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]
> Component type: CHANNEL, name: MemChannel started 2015-07-13
> 17:43:51,468 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:173)]
> Starting Sink Twitsink 2015-07-13 17:43:51,470 (conf-file-poller-0)
> [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:184)]
> Starting Source Twitter 2015-07-13 17:43:51,472
> (lifecycleSupervisor-1-0) [**ERROR** -
> **org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]
> Unable to start EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:IDLE} } -
> Exception follows.** java.lang.NoSuchMethodError:
> twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V at
> com.qb.twitter.TwitterStreamsource.start(TwitterStreamsource.java:83)
> at
> org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
> at
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745) 2015-07-13 17:43:51,476
> (lifecycleSupervisor-1-0) [WARN -
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)]
> Component EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:STOP} }
> stopped, since it could not besuccessfully started due to missing
> dependencies
配置文件为
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = Twitsink
TwitterAgent.sources.Twitter.type = com.qb.twitter.TwitterStreamsource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = **********
TwitterAgent.sources.Twitter.consumerSecret = ********
TwitterAgent.sources.Twitter.accessToken = ********
TwitterAgent.sources.Twitter.accessTokenSecret = ********
TwitterAgent.sources.Twitter.keywords = hadoop, big data
TwitterAgent.sinks.Twitsink.type=logger
TwitterAgent.sinks.Twitsink.channel = MemChannel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
您显然提供了错误的 twitter4j 库:
java.lang.NoSuchMethodError:
twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)
当前版本 flume (1.6.0) 链接到 twitter4j 3.0.3。
确保您的 lib 目录中没有两个版本的 twitter4j。
我还假设您知道 Flume 有自己的 TwitterSource,它工作得很好,您有理由重新实现自己的版本。
我正在尝试使用来自 here 的自定义源配置 flume 代理,我尝试使用命令
运行 flume 代理flume-ng agent --conf conf --conf-file conf/twitter1.conf --name TwitterAgent
但是我无法启动我的 Flume-agent,它显示这样的错误消息,
命令提示符回复为
> Info: Sourcing environment configuration script
> /usr/local/lib/apache-flume-1.6.0-bin/conf/flume-env.sh Info:
> Including Hive libraries found via () for Hive access
> + exec /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms500m -Xmx1000m -Dcom.sun.management.jmxremote -cp '/usr/local/lib/apache-flume-1.6.0-bin/conf:/usr/local/lib/apache-flume-1.6.0-bin/lib/*:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/lib/twitterstream.jar:/usr/local/lib/apache-flume-1.6.0-bin/lib/flume-sources-1.0-SNAPSHOT.jar:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/lib/*:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/libext/*:/lib/*'
> -Djava.library.path= org.apache.flume.node.Application --conf-file conf/twitter1.conf --name TwitterAgent 2015-07-13 17:43:51,355
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)]
> Configuration provider starting 2015-07-13 17:43:51,361
> (conf-file-poller-0) [INFO -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)]
> Reloading configuration file:conf/twitter1.conf 2015-07-13
> 17:43:51,365 (conf-file-poller-0) [WARN -
> org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:102)]
> Configuration property ignored: = 2015-07-13 17:43:51,369
> (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:Twitsink 2015-07-13 17:43:51,369 (conf-file-poller-0) [INFO
> - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:Twitsink 2015-07-13 17:43:51,369 (conf-file-poller-0) [INFO
> - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)]
> Added sinks: Twitsink Agent: TwitterAgent 2015-07-13 17:43:51,379
> (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)]
> Post-validation flume configuration contains configuration for agents:
> [TwitterAgent] 2015-07-13 17:43:51,380 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)]
> Creating channels 2015-07-13 17:43:51,385 (conf-file-poller-0) [INFO -
> org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)]
> Creating instance of channel MemChannel type memory 2015-07-13
> 17:43:51,391 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)]
> Created channel MemChannel 2015-07-13 17:43:51,391
> (conf-file-poller-0) [INFO -
> org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)]
> Creating instance of source Twitter, type
> com.qb.twitter.TwitterStreamsource 2015-07-13 17:43:51,444
> (conf-file-poller-0) [INFO -
> org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)]
> Creating instance of sink: Twitsink, type: logger 2015-07-13
> 17:43:51,448 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)]
> Channel MemChannel connected to [Twitter, Twitsink] 2015-07-13
> 17:43:51,456 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:138)]
> Starting new configuration:{
> sourceRunners:{Twitter=EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:IDLE} }}
> sinkRunners:{Twitsink=SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@385d25c0
> counterGroup:{ name:null counters:{} } }}
> channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name:
> MemChannel}} } 2015-07-13 17:43:51,463 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:145)]
> Starting Channel MemChannel 2015-07-13 17:43:51,466
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]
> Monitored counter group for type: CHANNEL, name: MemChannel:
> Successfully registered new MBean. 2015-07-13 17:43:51,468
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]
> Component type: CHANNEL, name: MemChannel started 2015-07-13
> 17:43:51,468 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:173)]
> Starting Sink Twitsink 2015-07-13 17:43:51,470 (conf-file-poller-0)
> [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:184)]
> Starting Source Twitter 2015-07-13 17:43:51,472
> (lifecycleSupervisor-1-0) [**ERROR** -
> **org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]
> Unable to start EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:IDLE} } -
> Exception follows.** java.lang.NoSuchMethodError:
> twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V at
> com.qb.twitter.TwitterStreamsource.start(TwitterStreamsource.java:83)
> at
> org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
> at
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745) 2015-07-13 17:43:51,476
> (lifecycleSupervisor-1-0) [WARN -
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)]
> Component EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:STOP} }
> stopped, since it could not besuccessfully started due to missing
> dependencies
配置文件为
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = Twitsink
TwitterAgent.sources.Twitter.type = com.qb.twitter.TwitterStreamsource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = **********
TwitterAgent.sources.Twitter.consumerSecret = ********
TwitterAgent.sources.Twitter.accessToken = ********
TwitterAgent.sources.Twitter.accessTokenSecret = ********
TwitterAgent.sources.Twitter.keywords = hadoop, big data
TwitterAgent.sinks.Twitsink.type=logger
TwitterAgent.sinks.Twitsink.channel = MemChannel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
您显然提供了错误的 twitter4j 库:
java.lang.NoSuchMethodError:
twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)
当前版本 flume (1.6.0) 链接到 twitter4j 3.0.3。
确保您的 lib 目录中没有两个版本的 twitter4j。
我还假设您知道 Flume 有自己的 TwitterSource,它工作得很好,您有理由重新实现自己的版本。