Apache Flume 拦截器 - 无法实例化生成器
Apache Flume interceptor - Could not instantiate Builder
我为 apache flume 1.7 编写了自定义拦截器。拦截器必须将特殊的 header 设置为来自 kafka 源的所有事件,其主题与配置定义的正则表达式匹配。但这是行不通的。我对 java 的了解太少了,请帮我解决问题。
我的部分配置 /etc/flume-ng/conf/flume.conf:
######################## kafka source ########################
agent.sources.kafka_source.type =
org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.interceptors = i1
agent.sources.kafka_source.interceptors.i1.type = org.apache.flume.interceptor.TopicRotationHeaderInterceptor
agent.sources.kafka_source.interceptors.i1.regex = stat_.+
agent.sources.kafka_source.interceptors.i1.value = hourly
agent.sources.kafka_source.interceptors.i1.default = daily
我在 flume.log 中遇到错误:
31 Jul 2017 18:41:11,819 ERROR [conf-file-poller-0] (org.apache.flume.channel.ChannelProcessor.configureInterceptors:118) - Could not instantiate Builder. Exception follows.
java.lang.InstantiationException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
31 Jul 2017 18:41:11,823 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSources:361) -
Source kafka_source has been removed due to an error during configuration
org.apache.flume.FlumeException: Interceptor.Builder not constructable.
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:119)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InstantiationException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
... 12 more
Caused by: java.lang.NoSuchMethodException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
拦截器源码:
package org.apache.flume.interceptor;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Interceptor class that appends topic rotation period header to all events.
*
* Properties:<p>
*
* regex: regex to match topics
*
* value: Value to use in header insertion.
* (default is "value")<p>
*
* Sample config:<p>
*
* <code>
* agent.sources.r1.channels = c1<p>
* agent.sources.r1.type = SEQ<p>
* agent.sources.r1.interceptors = i1<p>
* agent.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TopicRotationHeaderInterceptor<p>
* agent.sources.r1.interceptors.i1.regex = stat_.+<p>
* agent.sources.r1.interceptors.i1.value = hourly<p>
* </code>
*
*/
public class TopicRotationHeaderInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(TopicRotationHeaderInterceptor.class);
private String value;
private String defaultValue;
private Pattern matchRegex;
/**
* Only {@link TopicRotationHeaderInterceptor.Builder} can build me
*/
private TopicRotationHeaderInterceptor(Pattern matchRegex, String value, String defaultValue) {
this.matchRegex = matchRegex;
this.value = value;
this.defaultValue = defaultValue;
}
@Override
public void initialize() {
// no-op
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
final String topic = (String)headers.get(Constants.TOPIC_HEADER);
String resultValue = defaultValue;
if (matchRegex != null) {
final Matcher matcher = matchRegex.matcher(topic);
if (matcher.matches()) {
resultValue = value;
}
}
headers.put(Constants.HEADER, resultValue);
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instance of the TopicRotationHeaderInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private String value;
private String defaultValue;
private String regexStr;
private Pattern matchRegex;
@Override
public void configure(Context context) {
regexStr = context.getString(Constants.REGEX, Constants.REGEX_DEFAULT);
matchRegex = Pattern.compile(regexStr);
value = context.getString(Constants.VALUE, Constants.VALUE_DEFAULT);
defaultValue = context.getString(Constants.DEFAULT_VALUE, Constants.DEFAULT_VALUE_DEFAULT);
}
@Override
public Interceptor build() {
return new TopicRotationHeaderInterceptor(matchRegex, value, defaultValue);
}
}
public static class Constants {
public static final String REGEX = "regex";
public static final String REGEX_DEFAULT = ".+";
public static final String VALUE = "value";
public static final String VALUE_DEFAULT = "daily";
public static final String DEFAULT_VALUE = "default";
public static final String DEFAULT_VALUE_DEFAULT = "daily";
public static final String HEADER = "rotation";
public static final String TOPIC_HEADER = "topic";
}
}
你的flume.conf文件有误,修改
org.apache.flume.interceptor.TopicRotationHeaderInterceptor
对于:
org.apache.flume.interceptor.TopicRotationHeaderInterceptor**$Builder**
这里调用了拦截器的build方法class
问候
我为 apache flume 1.7 编写了自定义拦截器。拦截器必须将特殊的 header 设置为来自 kafka 源的所有事件,其主题与配置定义的正则表达式匹配。但这是行不通的。我对 java 的了解太少了,请帮我解决问题。 我的部分配置 /etc/flume-ng/conf/flume.conf:
######################## kafka source ########################
agent.sources.kafka_source.type =
org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.interceptors = i1
agent.sources.kafka_source.interceptors.i1.type = org.apache.flume.interceptor.TopicRotationHeaderInterceptor
agent.sources.kafka_source.interceptors.i1.regex = stat_.+
agent.sources.kafka_source.interceptors.i1.value = hourly
agent.sources.kafka_source.interceptors.i1.default = daily
我在 flume.log 中遇到错误:
31 Jul 2017 18:41:11,819 ERROR [conf-file-poller-0] (org.apache.flume.channel.ChannelProcessor.configureInterceptors:118) - Could not instantiate Builder. Exception follows.
java.lang.InstantiationException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
31 Jul 2017 18:41:11,823 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSources:361) -
Source kafka_source has been removed due to an error during configuration
org.apache.flume.FlumeException: Interceptor.Builder not constructable.
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:119)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InstantiationException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
... 12 more
Caused by: java.lang.NoSuchMethodException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
拦截器源码:
package org.apache.flume.interceptor;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Interceptor class that appends topic rotation period header to all events.
*
* Properties:<p>
*
* regex: regex to match topics
*
* value: Value to use in header insertion.
* (default is "value")<p>
*
* Sample config:<p>
*
* <code>
* agent.sources.r1.channels = c1<p>
* agent.sources.r1.type = SEQ<p>
* agent.sources.r1.interceptors = i1<p>
* agent.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TopicRotationHeaderInterceptor<p>
* agent.sources.r1.interceptors.i1.regex = stat_.+<p>
* agent.sources.r1.interceptors.i1.value = hourly<p>
* </code>
*
*/
public class TopicRotationHeaderInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(TopicRotationHeaderInterceptor.class);
private String value;
private String defaultValue;
private Pattern matchRegex;
/**
* Only {@link TopicRotationHeaderInterceptor.Builder} can build me
*/
private TopicRotationHeaderInterceptor(Pattern matchRegex, String value, String defaultValue) {
this.matchRegex = matchRegex;
this.value = value;
this.defaultValue = defaultValue;
}
@Override
public void initialize() {
// no-op
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
final String topic = (String)headers.get(Constants.TOPIC_HEADER);
String resultValue = defaultValue;
if (matchRegex != null) {
final Matcher matcher = matchRegex.matcher(topic);
if (matcher.matches()) {
resultValue = value;
}
}
headers.put(Constants.HEADER, resultValue);
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instance of the TopicRotationHeaderInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private String value;
private String defaultValue;
private String regexStr;
private Pattern matchRegex;
@Override
public void configure(Context context) {
regexStr = context.getString(Constants.REGEX, Constants.REGEX_DEFAULT);
matchRegex = Pattern.compile(regexStr);
value = context.getString(Constants.VALUE, Constants.VALUE_DEFAULT);
defaultValue = context.getString(Constants.DEFAULT_VALUE, Constants.DEFAULT_VALUE_DEFAULT);
}
@Override
public Interceptor build() {
return new TopicRotationHeaderInterceptor(matchRegex, value, defaultValue);
}
}
public static class Constants {
public static final String REGEX = "regex";
public static final String REGEX_DEFAULT = ".+";
public static final String VALUE = "value";
public static final String VALUE_DEFAULT = "daily";
public static final String DEFAULT_VALUE = "default";
public static final String DEFAULT_VALUE_DEFAULT = "daily";
public static final String HEADER = "rotation";
public static final String TOPIC_HEADER = "topic";
}
}
你的flume.conf文件有误,修改
org.apache.flume.interceptor.TopicRotationHeaderInterceptor
对于:
org.apache.flume.interceptor.TopicRotationHeaderInterceptor**$Builder**
这里调用了拦截器的build方法class
问候