如何通过 Java 代码更改 Apache flume 的配置文件?
How to change configuration file of Apache flume through Java code?
我目前正在从事一个大数据项目,用于对 Twitter 的热门话题进行情绪分析。跟着cloudera的教程,明白了如何通过flume.
将tweets传到Hadoop
http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/
flume.conf:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey =
TwitterAgent.sources.Twitter.consumerSecret =
TwitterAgent.sources.Twitter.accessToken =
TwitterAgent.sources.Twitter.accessTokenSecret =
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
现在要将其扩展到我的应用程序,我需要 flume 配置文件中的关键字部分来包含热门话题,我想出了 Java 代码来获取热门话题,但我遇到了问题现在我不知道如何将此代码连接到 flume 配置文件或如何制作一个新文件,在关键字部分添加实时趋势主题。我在网上搜索了很多这个,因为我是这个领域的初学者,如果你提供一些信息或者至少有一些其他的替代方案,那将会有很大的帮助。
一个很有意思的问题..!
我同意@cricket_007 的评论 - 在不重新启动 Flume 代理的情况下编辑配置是无法实现的。
我不能说太多,因为我还没有看到您的 java 代码来获取热门话题的关键字。但是,根据您提供的信息,我可以想到一种替代方法(或者我应该说一种解决方法)-但我自己还没有尝试过。
您可以像这样修改 TwitterSource.java class:
public void configure(Context context) {
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
//MODIFY THE FOLLOWING PORTION
String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
if (keywordString.trim().length() == 0) {
keywords = new String[0];
} else {
keywords = keywordString.split(",");
for (int i = 0; i < keywords.length; i++) {
keywords[i] = keywords[i].trim();
}
}
//UNTIL THIS POINT
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(consumerKey);
cb.setOAuthConsumerSecret(consumerSecret);
cb.setOAuthAccessToken(accessToken);
cb.setOAuthAccessTokenSecret(accessTokenSecret);
cb.setJSONStoreEnabled(true);
cb.setIncludeEntitiesEnabled(true);
twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
}
我已经在上面的评论中添加了初始化 keywordString 变量的地方 - 您可以调用 java 代码(我假设这是一种方法,您可以 return逗号分隔的关键字字符串)而不是从 flume.conf 中可用的上下文中提取它(只需删除 context.getString() 部分)。
除此之外,只需从 flume.conf 中删除以下语句:
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
希望对您有所帮助。
我目前正在从事一个大数据项目,用于对 Twitter 的热门话题进行情绪分析。跟着cloudera的教程,明白了如何通过flume.
将tweets传到Hadoophttp://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/
flume.conf:
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'TwitterAgent' TwitterAgent.sources = Twitter TwitterAgent.channels = MemChannel TwitterAgent.sinks = HDFS TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource TwitterAgent.sources.Twitter.channels = MemChannel TwitterAgent.sources.Twitter.consumerKey = TwitterAgent.sources.Twitter.consumerSecret = TwitterAgent.sources.Twitter.accessToken = TwitterAgent.sources.Twitter.accessTokenSecret = TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing TwitterAgent.sinks.HDFS.channel = MemChannel TwitterAgent.sinks.HDFS.type = hdfs TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/ TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000 TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 TwitterAgent.channels.MemChannel.type = memory TwitterAgent.channels.MemChannel.capacity = 10000 TwitterAgent.channels.MemChannel.transactionCapacity = 100
现在要将其扩展到我的应用程序,我需要 flume 配置文件中的关键字部分来包含热门话题,我想出了 Java 代码来获取热门话题,但我遇到了问题现在我不知道如何将此代码连接到 flume 配置文件或如何制作一个新文件,在关键字部分添加实时趋势主题。我在网上搜索了很多这个,因为我是这个领域的初学者,如果你提供一些信息或者至少有一些其他的替代方案,那将会有很大的帮助。
一个很有意思的问题..!
我同意@cricket_007 的评论 - 在不重新启动 Flume 代理的情况下编辑配置是无法实现的。
我不能说太多,因为我还没有看到您的 java 代码来获取热门话题的关键字。但是,根据您提供的信息,我可以想到一种替代方法(或者我应该说一种解决方法)-但我自己还没有尝试过。
您可以像这样修改 TwitterSource.java class:
public void configure(Context context) {
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
//MODIFY THE FOLLOWING PORTION
String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
if (keywordString.trim().length() == 0) {
keywords = new String[0];
} else {
keywords = keywordString.split(",");
for (int i = 0; i < keywords.length; i++) {
keywords[i] = keywords[i].trim();
}
}
//UNTIL THIS POINT
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(consumerKey);
cb.setOAuthConsumerSecret(consumerSecret);
cb.setOAuthAccessToken(accessToken);
cb.setOAuthAccessTokenSecret(accessTokenSecret);
cb.setJSONStoreEnabled(true);
cb.setIncludeEntitiesEnabled(true);
twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
}
我已经在上面的评论中添加了初始化 keywordString 变量的地方 - 您可以调用 java 代码(我假设这是一种方法,您可以 return逗号分隔的关键字字符串)而不是从 flume.conf 中可用的上下文中提取它(只需删除 context.getString() 部分)。
除此之外,只需从 flume.conf 中删除以下语句:
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
希望对您有所帮助。