为什么这个消费者(运行 在 servlet 上)没有收到来自 kafka 主题的任何消息?
Why does this consumer (running on a servlet) not receive any messages from the kafka topic?
我已经在我的 servlet 上设置了一个 kafka-consumer。我阅读了我的 kafka 主题并想使用 servlet 的 doGet 方法打印最新值。
但是如果我发送 doGet,我只会得到 "null" 返回...
我不确定,但我发现了另一个 post,其中有人提到,这可能是因为没有与 zookeeper 服务器的连接。所以我试图解决这个问题,将 zookeeper.connection
添加到消费者的属性中;即使它已经过时了。
希望有人能给我一些建议。
Code of my Servlet:
public class KafkaServlet extends HttpServlet implements Runnable {
public String kafkaMessage;
private Properties props;
private KafkaConsumer<String, String> consumer;
Thread Trans;
/**
* setting up the properties and other consumer using the constructor of KafkaServlet
*/
public KafkaServlet() {
props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); //replace 'localhost' with the actual IP to get it working.
props.put("zookeeper.connect", "localhost:2181");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("kafkaTopic"));
}
/**
* setup the thread and run it using the init-method
*/
@Override
public void init (ServletConfig config) throws ServletException {
super.init(config);
Trans = new Thread(this);
Trans.setPriority(Thread.MIN_PRIORITY);
Trans.start();
}
/**
* implement the doPost-Method if we want to use it
*/
public void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
//empty -- nothing to post here
}
/**
* doGet will get print out our returned message from kafka
*/
@Override
public void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
res.setContentType("text/html");
PrintWriter out = res.getWriter();
String title = "Reading Parameters from kafkaTopic";
out.println("<HTML>" +
"<BODY>\n" +
"<H1 ALIGN=CENTER>" + title + "</H1>\n" +
"<UL>\n" +
" <LI>MESSAGE: "
+ kafkaMessage + "\n" +
"</UL>\n" +
"</BODY></HTML>");
}
/**
* thread, which grabs the messages from kafka and stores the latest one in "kafkaMessage"
*/
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//KafkaMessage.setMessage(record.value());
kafkaMessage = record.value();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}}
EDIT: Latest logfiles from "localhost.log":
19-Dec-2016 13:42:42.773 INFO [localhost-startStop-2] org.apache.catalina.core.ApplicationContext.log SessionListener: contextDestroyed()
19-Dec-2016 13:42:42.774 INFO [localhost-startStop-2] org.apache.catalina.core.ApplicationContext.log ContextListener: contextDestroyed()
19-Dec-2016 13:42:54.742 INFO [localhost-startStop-1] org.apache.catalina.core.ApplicationContext.log ContextListener: contextInitialized()
19-Dec-2016 13:42:54.745 INFO [localhost-startStop-1] org.apache.catalina.core.ApplicationContext.log SessionListener: contextInitialized()
我想通了:问题,为什么我没有收到任何消息是行 props.put("bootstrap.servers", "localhost:9092");
。
我不得不将 "localhost" 更改为我的 VM 的实际 IP。这有点奇怪,因为我配置的所有其他 属性(在 apache 风暴;kafka;另一个包含 kafka 生产者的 servlet)在 "localhost".
下运行良好
我已经在我的 servlet 上设置了一个 kafka-consumer。我阅读了我的 kafka 主题并想使用 servlet 的 doGet 方法打印最新值。
但是如果我发送 doGet,我只会得到 "null" 返回...
我不确定,但我发现了另一个 post,其中有人提到,这可能是因为没有与 zookeeper 服务器的连接。所以我试图解决这个问题,将 zookeeper.connection
添加到消费者的属性中;即使它已经过时了。
希望有人能给我一些建议。
Code of my Servlet:
public class KafkaServlet extends HttpServlet implements Runnable {
public String kafkaMessage;
private Properties props;
private KafkaConsumer<String, String> consumer;
Thread Trans;
/**
* setting up the properties and other consumer using the constructor of KafkaServlet
*/
public KafkaServlet() {
props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); //replace 'localhost' with the actual IP to get it working.
props.put("zookeeper.connect", "localhost:2181");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("kafkaTopic"));
}
/**
* setup the thread and run it using the init-method
*/
@Override
public void init (ServletConfig config) throws ServletException {
super.init(config);
Trans = new Thread(this);
Trans.setPriority(Thread.MIN_PRIORITY);
Trans.start();
}
/**
* implement the doPost-Method if we want to use it
*/
public void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
//empty -- nothing to post here
}
/**
* doGet will get print out our returned message from kafka
*/
@Override
public void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
res.setContentType("text/html");
PrintWriter out = res.getWriter();
String title = "Reading Parameters from kafkaTopic";
out.println("<HTML>" +
"<BODY>\n" +
"<H1 ALIGN=CENTER>" + title + "</H1>\n" +
"<UL>\n" +
" <LI>MESSAGE: "
+ kafkaMessage + "\n" +
"</UL>\n" +
"</BODY></HTML>");
}
/**
* thread, which grabs the messages from kafka and stores the latest one in "kafkaMessage"
*/
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//KafkaMessage.setMessage(record.value());
kafkaMessage = record.value();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}}
EDIT: Latest logfiles from "localhost.log":
19-Dec-2016 13:42:42.773 INFO [localhost-startStop-2] org.apache.catalina.core.ApplicationContext.log SessionListener: contextDestroyed()
19-Dec-2016 13:42:42.774 INFO [localhost-startStop-2] org.apache.catalina.core.ApplicationContext.log ContextListener: contextDestroyed()
19-Dec-2016 13:42:54.742 INFO [localhost-startStop-1] org.apache.catalina.core.ApplicationContext.log ContextListener: contextInitialized()
19-Dec-2016 13:42:54.745 INFO [localhost-startStop-1] org.apache.catalina.core.ApplicationContext.log SessionListener: contextInitialized()
我想通了:问题,为什么我没有收到任何消息是行 props.put("bootstrap.servers", "localhost:9092");
。
我不得不将 "localhost" 更改为我的 VM 的实际 IP。这有点奇怪,因为我配置的所有其他 属性(在 apache 风暴;kafka;另一个包含 kafka 生产者的 servlet)在 "localhost".
下运行良好