NoClassDefFoundError: kafka/api/OffsetRequest
NoClassDefFoundError: kafka/api/OffsetRequest
我正在尝试使用 apache storm、kafka 和 trident 编写实时处理应用程序
但是在 TridentKafkaConfig 的初始化中我看到了这个错误
Exception in thread "main" java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43)
at storm.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:30)
at spout.TestSpout.<clinit>(TestSpout.java:22)
at IOTTridentTopology.initializeTridentTopology(IOTTridentTopology.java:31)
at IOTTridentTopology.main(IOTTridentTopology.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 10 more
我的嘴 class 是
public class TestSpout extends OpaqueTridentKafkaSpout {
private static TridentKafkaConfig config;
private static BrokerHosts HOSTS = new ZkHosts(TridentConfig.ZKHOSTS);
private static String TOPIC = "test";
private static int BUFFER_SIZE = TridentConfig.BUFFER_SIZE;
static{
config = new TridentKafkaConfig(HOSTS, TOPIC);
config.scheme = new SchemeAsMultiScheme(new RawScheme());
config.bufferSizeBytes = BUFFER_SIZE;
}
public TestSpout(TridentKafkaConfig config) {
super(config);
}
public TestSpout() {
super(config);
}
}
主要class:
public static void main(String[] args) {
initializeTridentTopology();
}
private static void initializeTridentTopology() {
TridentTopology topology = new TridentTopology();
TestSpout spout = new TestSpout();
//////////////// test //////////////////////
topology.newStream("testspout", spout).each(spout.getOutputFields(), new TestFunction(), new Fields());
/////////////// end test ///////////////////
LocalCluster cluster = new LocalCluster();
Config config = new Config();
config.setDebug(false);
config.setMaxTaskParallelism(1);
config.registerSerialization(storm.kafka.trident.GlobalPartitionInformation.class);
config.registerSerialization(java.util.TreeMap.class);
config.setNumWorkers(5);
config.setFallBackOnJavaSerialization(true);
cluster.submitTopology("KafkaTrident", config, topology.build());
}
和我的 pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
<groupId>IOT</groupId>
<artifactId>ver0.1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.3</version>
</dependency>
</dependencies>
我正在尝试不同版本的 storm-kafka(0.9.3、0.9.4、0.9.5、0.9.6 和 0.10.0)和 storm-core(9.3、9.4 和 9.6)
但我仍然看到我之前的错误
通过谷歌搜索我找到了这个link但是...
ClassNotFoundException: kafka.api.OffsetRequest
如果您使用 LocalCluster 部署风暴拓扑,您需要将 Kafka 库添加到您的依赖项中(对于 Storm 0.10.0):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
kafka.api.OffsetRequest class 被遗漏了,因为 org.apache.kafka 是 提供的 对 storm-kafka 的依赖:
http://mvnrepository.com/artifact/org.apache.storm/storm-kafka/0.10.0。请参阅提供的依赖项部分了解详细信息。
谷歌搜索后我发现了这个link
https://github.com/wurstmeister/storm-kafka-0.8-plus-test
并在 pom.xml 文件
中找到了我的答案
通过添加此代码并找到兼容的 kafka 版本,所有问题都已解决
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
我正在尝试使用 apache storm、kafka 和 trident 编写实时处理应用程序 但是在 TridentKafkaConfig 的初始化中我看到了这个错误
Exception in thread "main" java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43)
at storm.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:30)
at spout.TestSpout.<clinit>(TestSpout.java:22)
at IOTTridentTopology.initializeTridentTopology(IOTTridentTopology.java:31)
at IOTTridentTopology.main(IOTTridentTopology.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 10 more
我的嘴 class 是
public class TestSpout extends OpaqueTridentKafkaSpout {
private static TridentKafkaConfig config;
private static BrokerHosts HOSTS = new ZkHosts(TridentConfig.ZKHOSTS);
private static String TOPIC = "test";
private static int BUFFER_SIZE = TridentConfig.BUFFER_SIZE;
static{
config = new TridentKafkaConfig(HOSTS, TOPIC);
config.scheme = new SchemeAsMultiScheme(new RawScheme());
config.bufferSizeBytes = BUFFER_SIZE;
}
public TestSpout(TridentKafkaConfig config) {
super(config);
}
public TestSpout() {
super(config);
}
}
主要class:
public static void main(String[] args) {
initializeTridentTopology();
}
private static void initializeTridentTopology() {
TridentTopology topology = new TridentTopology();
TestSpout spout = new TestSpout();
//////////////// test //////////////////////
topology.newStream("testspout", spout).each(spout.getOutputFields(), new TestFunction(), new Fields());
/////////////// end test ///////////////////
LocalCluster cluster = new LocalCluster();
Config config = new Config();
config.setDebug(false);
config.setMaxTaskParallelism(1);
config.registerSerialization(storm.kafka.trident.GlobalPartitionInformation.class);
config.registerSerialization(java.util.TreeMap.class);
config.setNumWorkers(5);
config.setFallBackOnJavaSerialization(true);
cluster.submitTopology("KafkaTrident", config, topology.build());
}
和我的 pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0
<groupId>IOT</groupId>
<artifactId>ver0.1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.3</version>
</dependency>
</dependencies>
我正在尝试不同版本的 storm-kafka(0.9.3、0.9.4、0.9.5、0.9.6 和 0.10.0)和 storm-core(9.3、9.4 和 9.6)
但我仍然看到我之前的错误
通过谷歌搜索我找到了这个link但是...
ClassNotFoundException: kafka.api.OffsetRequest
如果您使用 LocalCluster 部署风暴拓扑,您需要将 Kafka 库添加到您的依赖项中(对于 Storm 0.10.0):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
kafka.api.OffsetRequest class 被遗漏了,因为 org.apache.kafka 是 提供的 对 storm-kafka 的依赖: http://mvnrepository.com/artifact/org.apache.storm/storm-kafka/0.10.0。请参阅提供的依赖项部分了解详细信息。
谷歌搜索后我发现了这个link https://github.com/wurstmeister/storm-kafka-0.8-plus-test 并在 pom.xml 文件
中找到了我的答案通过添加此代码并找到兼容的 kafka 版本,所有问题都已解决
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>