使用 Servlet 连接 KafkaProducer
Connect KafkaProducer using Servlet
我正在尝试使用 servlet 连接 KafkaProducer 并在 eclipse 中设置我的项目。但是当我 运行 KafkaProducer 我得到一个例外:
org.apache.catalina.core.StandardWrapperValve invoke SEVERE:
Servlet.service() for servlet [simple_kafka.SimpleProducer] in context
with path [/kafka_web] threw exception [Servlet execution threw an
exception] with root cause java.lang.ClassNotFoundException:
org.apache.kafka.clients.producer.KafkaProducer at
org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1291)
at
org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1119)
at simple_kafka.SimpleProducer.doPost(SimpleProducer.java:56) at
javax.servlet.http.HttpServlet.service(HttpServlet.java:661) at
javax.servlet.http.HttpServlet.service(HttpServlet.java:742) at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at
org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
at
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at
org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478)
at
org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at
org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at
org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:650)
at
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at
org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at
org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
at
org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at
org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
at
org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
我的 KafkaProducerServlet 如下:
package simple_kafka;
import java.io.IOException;
import java.util.Properties;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
//import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Servlet implementation class SimpleProducer
*/
@WebServlet("/producer")
public class SimpleProducer extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public SimpleProducer() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response)
*/
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
// response.getWriter().append("Served at: ").append(request.getContextPath());
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse response)
*/
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
doGet(request, response);
response.getWriter().append("Served at: ").append(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
String topicName = "ReplicaTopic";
String key = "Key1";
String value = "Value-1";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.59:9092,192.168.1.59:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer <String, String>(props);
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
producer.close();
}
catch(Exception e) {
response.getWriter().append("Exception at: ").append((CharSequence) e);
}
// response.sendRedirect("/consumer");
}
}
我在项目 属性 的 BuildPath 中包含了 Following JAR:
- 卡夫卡 2.12-0.11.0.1.jar
- kafka-clients-0.11.0.1.jar
- slf4j-api-1.7.25.jar
经过一整天的搜索,我发现您需要包含 JAR 文件 WEB-INF-> lib 文件夹,并且您需要从 Eclipse 中删除您的服务器并重新配置它。这样做之后,我可以 运行 KafkaProducer。
我正在尝试使用 servlet 连接 KafkaProducer 并在 eclipse 中设置我的项目。但是当我 运行 KafkaProducer 我得到一个例外:
org.apache.catalina.core.StandardWrapperValve invoke SEVERE: Servlet.service() for servlet [simple_kafka.SimpleProducer] in context with path [/kafka_web] threw exception [Servlet execution threw an exception] with root cause java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.KafkaProducer at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1291) at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1119) at simple_kafka.SimpleProducer.doPost(SimpleProducer.java:56) at javax.servlet.http.HttpServlet.service(HttpServlet.java:661) at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:650) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748)
我的 KafkaProducerServlet 如下:
package simple_kafka;
import java.io.IOException;
import java.util.Properties;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
//import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Servlet implementation class SimpleProducer
*/
@WebServlet("/producer")
public class SimpleProducer extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public SimpleProducer() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response)
*/
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
// response.getWriter().append("Served at: ").append(request.getContextPath());
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse response)
*/
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
doGet(request, response);
response.getWriter().append("Served at: ").append(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
String topicName = "ReplicaTopic";
String key = "Key1";
String value = "Value-1";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.59:9092,192.168.1.59:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer <String, String>(props);
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
producer.close();
}
catch(Exception e) {
response.getWriter().append("Exception at: ").append((CharSequence) e);
}
// response.sendRedirect("/consumer");
}
}
我在项目 属性 的 BuildPath 中包含了 Following JAR:
- 卡夫卡 2.12-0.11.0.1.jar
- kafka-clients-0.11.0.1.jar
- slf4j-api-1.7.25.jar
经过一整天的搜索,我发现您需要包含 JAR 文件 WEB-INF-> lib 文件夹,并且您需要从 Eclipse 中删除您的服务器并重新配置它。这样做之后,我可以 运行 KafkaProducer。