Kafka 生产者 - org.apache.kafka.common.serialization.StringSerializer 找不到
Kafka Producer - org.apache.kafka.common.serialization.StringSerializer could not be found
我已经创建了一个简单的 Kafka Producer & Consumer.I 我正在使用 kafka_2.11-0.9.0.0。这是我的生产者代码。
public class KafkaProducerTest {
public static String topicName = "test-topic-2";
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
StringSerializer.class.getName());
props.put("value.serializer",
StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
topicName, Integer.toString(i), Integer.toString(i));
System.out.println(producerRecord);
producer.send(producerRecord);
}
producer.close();
}
}
启动捆绑包时我遇到了以下错误:
2016-05-20 09:44:57,792 | ERROR | nsole user karaf | ShellUtil | 44 - org.apache.karaf.shell.core - 4.0.3 | Exception caught while executing command
org.apache.karaf.shell.support.MultiException: Error executing command on bundles:
Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.karaf.shell.support.MultiException.throwIf(MultiException.java:61)
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:69)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.bundle.command.BundlesCommand.execute(BundlesCommand.java:54)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.shell.impl.action.command.ActionCommand.execute(ActionCommand.java:83)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:67)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:87)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.executeCmd(Closure.java:480)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.executeStatement(Closure.java:406)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Pipe.run(Pipe.java:108)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:182)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:119)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.CommandSessionImpl.execute(CommandSessionImpl.java:94)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.ConsoleSessionImpl.run(ConsoleSessionImpl.java:270)[44:org.apache.karaf.shell.core:4.0.3]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_66]
Caused by: java.lang.Exception: Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:66)[24:org.apache.karaf.bundle.core:4.0.3]
... 12 more
Caused by: org.osgi.framework.BundleException: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.felix.framework.Felix.activateBundle(Felix.java:2276)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.Felix.startBundle(Felix.java:2144)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:998)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.karaf.bundle.command.Start.executeOnBundle(Start.java:38)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:64)[24:org.apache.karaf.bundle.core:4.0.3]
... 12 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:317)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at com.NewKafka.NewKafkaArtifact.KafkaProducerTest.main(KafkaProducerTest.java:25)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
at com.NewKafka.NewKafkaArtifact.StartKafka.start(StartKafka.java:11)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
at org.apache.felix.framework.util.SecureAction.startActivator(SecureAction.java:697)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.Felix.activateBundle(Felix.java:2226)[org.apache.felix.framework-5.4.0.jar:]
... 16 more
我试过如下设置 key.serializer
和 value.serializer
:
props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());
也喜欢,但还是报同样的错误。我在这里做错了什么。
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
您使用的版本有问题。
也有人建议版本0.8.2.2_1。
建议您调整您使用的kafka版本并尝试一下。
代码方面,我交叉检查了 kafka 开发列表中的许多代码示例,看起来你写的方式是正确的。
即Thread.currentThread().setContextClassLoader(null);
最近我找到了解决办法。将 Thead 上下文加载器设置为 null 为我解决了这个问题。谢谢
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);
通过阅读kafka客户端源码找到原因
kafka客户端使用Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader())
获取Class对象,并创建实例,关键点是classLoader,由最后一个参数指定,方法 Utils.getContextOrKafkaClassLoader()
的实现是
public static ClassLoader getContextOrKafkaClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null)
return getKafkaClassLoader();
else
return cl;
}
因此,默认情况下,org.apache.kafka.common.serialization.StringSerializer
的 Class 对象由应用程序ClassLoader 加载,如果您的目标 class 未被应用程序加载Class加载程序,会出现这个问题!
要解决这个问题,只需像这样在新的 KafkaProducer 实例之前将当前线程的 ContextClassLoader 设置为 null
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);
希望我的回答能让你知道发生了什么。
尝试使用这些道具而不是您的道具。
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
这是完整的 Kafka Producer 示例:-
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class FxDateProducer {
public static void main(String[] args) throws Exception{
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
String topicName = args[0].toString();
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
问题似乎出在 class 加载器上,正如@Ram Ghadiyaram 在他的回答中指出的那样。为了让它与 kafka-clients 2.x 一起工作,我必须执行以下操作:
public Producer<String, String> createProducer() {
ClassLoader original = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
... etc ...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Thread.currentThread().setContextClassLoader(original);
return producer;
}
这允许系统继续使用原始 classloader 加载额外的 classes。这在 Wildfly/JBoss 中是必需的(我正在使用的特定应用程序是 Keycloak)。
这是因为 kafka 版本问题。确保使用正确的 kafka 版本。我使用的版本是'kafka_2.12-1.0.1'
但请尝试在您的代码中使用以下属性。这解决了我的问题。
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
之前我使用的是导致问题的以下属性。
//props.put("key.serializer","org.apache.kafka.common.serialization.Stringserializer");
//props.put("value.serializer","org.apache.kafka.common.serialization.Stringserializer");
我已经创建了一个简单的 Kafka Producer & Consumer.I 我正在使用 kafka_2.11-0.9.0.0。这是我的生产者代码。
public class KafkaProducerTest {
public static String topicName = "test-topic-2";
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
StringSerializer.class.getName());
props.put("value.serializer",
StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
topicName, Integer.toString(i), Integer.toString(i));
System.out.println(producerRecord);
producer.send(producerRecord);
}
producer.close();
}
}
启动捆绑包时我遇到了以下错误:
2016-05-20 09:44:57,792 | ERROR | nsole user karaf | ShellUtil | 44 - org.apache.karaf.shell.core - 4.0.3 | Exception caught while executing command
org.apache.karaf.shell.support.MultiException: Error executing command on bundles:
Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.karaf.shell.support.MultiException.throwIf(MultiException.java:61)
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:69)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.bundle.command.BundlesCommand.execute(BundlesCommand.java:54)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.shell.impl.action.command.ActionCommand.execute(ActionCommand.java:83)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:67)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:87)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.executeCmd(Closure.java:480)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.executeStatement(Closure.java:406)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Pipe.run(Pipe.java:108)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:182)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:119)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.CommandSessionImpl.execute(CommandSessionImpl.java:94)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.ConsoleSessionImpl.run(ConsoleSessionImpl.java:270)[44:org.apache.karaf.shell.core:4.0.3]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_66]
Caused by: java.lang.Exception: Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:66)[24:org.apache.karaf.bundle.core:4.0.3]
... 12 more
Caused by: org.osgi.framework.BundleException: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.felix.framework.Felix.activateBundle(Felix.java:2276)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.Felix.startBundle(Felix.java:2144)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:998)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.karaf.bundle.command.Start.executeOnBundle(Start.java:38)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:64)[24:org.apache.karaf.bundle.core:4.0.3]
... 12 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:317)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at com.NewKafka.NewKafkaArtifact.KafkaProducerTest.main(KafkaProducerTest.java:25)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
at com.NewKafka.NewKafkaArtifact.StartKafka.start(StartKafka.java:11)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
at org.apache.felix.framework.util.SecureAction.startActivator(SecureAction.java:697)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.Felix.activateBundle(Felix.java:2226)[org.apache.felix.framework-5.4.0.jar:]
... 16 more
我试过如下设置 key.serializer
和 value.serializer
:
props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());
也喜欢,但还是报同样的错误。我在这里做错了什么。
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
您使用的版本有问题。 也有人建议版本0.8.2.2_1。 建议您调整您使用的kafka版本并尝试一下。 代码方面,我交叉检查了 kafka 开发列表中的许多代码示例,看起来你写的方式是正确的。
即Thread.currentThread().setContextClassLoader(null);
最近我找到了解决办法。将 Thead 上下文加载器设置为 null 为我解决了这个问题。谢谢
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);
通过阅读kafka客户端源码找到原因
kafka客户端使用Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader())
获取Class对象,并创建实例,关键点是classLoader,由最后一个参数指定,方法 Utils.getContextOrKafkaClassLoader()
的实现是
public static ClassLoader getContextOrKafkaClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null)
return getKafkaClassLoader();
else
return cl;
}
因此,默认情况下,org.apache.kafka.common.serialization.StringSerializer
的 Class 对象由应用程序ClassLoader 加载,如果您的目标 class 未被应用程序加载Class加载程序,会出现这个问题!
要解决这个问题,只需像这样在新的 KafkaProducer 实例之前将当前线程的 ContextClassLoader 设置为 null
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);
希望我的回答能让你知道发生了什么。
尝试使用这些道具而不是您的道具。
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
这是完整的 Kafka Producer 示例:-
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class FxDateProducer {
public static void main(String[] args) throws Exception{
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
String topicName = args[0].toString();
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
问题似乎出在 class 加载器上,正如@Ram Ghadiyaram 在他的回答中指出的那样。为了让它与 kafka-clients 2.x 一起工作,我必须执行以下操作:
public Producer<String, String> createProducer() {
ClassLoader original = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
... etc ...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Thread.currentThread().setContextClassLoader(original);
return producer;
}
这允许系统继续使用原始 classloader 加载额外的 classes。这在 Wildfly/JBoss 中是必需的(我正在使用的特定应用程序是 Keycloak)。
这是因为 kafka 版本问题。确保使用正确的 kafka 版本。我使用的版本是'kafka_2.12-1.0.1'
但请尝试在您的代码中使用以下属性。这解决了我的问题。
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
之前我使用的是导致问题的以下属性。
//props.put("key.serializer","org.apache.kafka.common.serialization.Stringserializer");
//props.put("value.serializer","org.apache.kafka.common.serialization.Stringserializer");