如何克服kafka.consumer.ConsumerTimeoutException?
How to overcome kafka.consumer.ConsumerTimeoutException?
我正在使用kafka 2.11 版本编写消费者。我不断收到超时异常。我不确定我在这里使用了正确的 API
有人可以帮我吗?
执行者
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class MessageListener {
private Properties properties;
private ConsumerConnector consumerConnector;
private String topic;
private ExecutorService executor;
public MessageListener(String topic) {
this.topic = topic;
KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
try {
properties = confLoader.loadConsumerConfig();
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
public void start(File file) {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);
for (KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new ListenerThread(stream));
}
}
}
线程
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
public class ListenerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;;
public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
this.stream = msgStream;
}
@Override
public void run() {
try {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();
String topic = messageAndMetadata.topic();
byte[] message = messageAndMetadata.message();
System.out.println("111111111111111111111111111");
FileProcessor processor = new FileProcessor();
processor.processFile(topic, message);
}
} catch (ConsumerTimeoutException cte) {
System.out.println("Consumer timed out");
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
如果不想抛出这个异常可以设置consumer.timeout.ms=-1
我正在使用kafka 2.11 版本编写消费者。我不断收到超时异常。我不确定我在这里使用了正确的 API
有人可以帮我吗?
执行者
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class MessageListener {
private Properties properties;
private ConsumerConnector consumerConnector;
private String topic;
private ExecutorService executor;
public MessageListener(String topic) {
this.topic = topic;
KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
try {
properties = confLoader.loadConsumerConfig();
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
public void start(File file) {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);
for (KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new ListenerThread(stream));
}
}
}
线程
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
public class ListenerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;;
public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
this.stream = msgStream;
}
@Override
public void run() {
try {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();
String topic = messageAndMetadata.topic();
byte[] message = messageAndMetadata.message();
System.out.println("111111111111111111111111111");
FileProcessor processor = new FileProcessor();
processor.processFile(topic, message);
}
} catch (ConsumerTimeoutException cte) {
System.out.println("Consumer timed out");
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
如果不想抛出这个异常可以设置consumer.timeout.ms=-1