找不到 key.serializer 的 Class
Class for key.serializer could not be found
我正在使用 Spring 引导应用程序将 JSON 数据数组发送到 Kafka 主题,但出现以下错误:
error :org.apache.kafka.common.config.ConfigException: Invalid value
org.apache.kafka.common.serialization.StringSerializer; for
configuration key.serializer: Class
org.apache.kafka.common.serialization.StringSerializer; could not be found.
我尝试将序列化配置更改为:
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer;");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
到
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer;");
配置文件和服务文件代码:
@Configuration
public class KafkaProducerConfig {
@Bean
private static ProducerFactory<String, String> producerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer;");
// The following properties are used by LiKafkaProducerImpl
props.put("large.message.enabled", "true");
props.put("max.message.segment.bytes", 1000 * 1024);
props.put("segment.serializer", DefaultSegmentSerializer.class.getName());
props.put("auditor.class", LoggingAuditor.class.getName());
return new DefaultKafkaProducerFactory(props);
}
}
@Service
public class KafkaSender {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic.name}")
private String topicName;
public void sendData(List<Student> student) {
System.out.println("Inside Student"+ student.toString());
System.out.println("Inside Student"+ student);
// TODO Auto-generated method stub
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaHeaders.TOPIC, topicName);
System.out.println("\nStudent= " + headers);
// Construct a JSONObject from a Map.
JSONObject HeaderObject = new JSONObject(headers);
System.out.println("\nUsing new JSONObject() ==> " + HeaderObject);
final String record = HeaderObject.toString();
final int recordSize = record.length();
kafkaTemplate.send(new GenericMessage<>(student, headers));
LOGGER.info("Data - " + student + " sent to Kafka Topic - " + topicName);
}
}
POST json:
[{
"studentId": "Q45678123",
"firstName": "abc",
"lastName": "xyz",
"age": "12",
"address":{
"apartment": "apt 123",
"street": "street Info",
"state": "state",
"city": "city",
"postCode": "12345"
}
},
{
"studentId": "Q45678123",
"firstName": "abc",
"lastName": "xyz",
"age": "12",
"address":{
"apartment": "apt 123",
"street": "street Info",
"state": "state",
"city": "city",
"postCode": "12345"
}
}
]
您需要删除值末尾的分号
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
或者您可以像对段序列化程序那样使用 class.getName()
方法,我建议这样做更安全,因为它可以保证您想要的序列化程序在编译时可用
我正在使用 Spring 引导应用程序将 JSON 数据数组发送到 Kafka 主题,但出现以下错误:
error :org.apache.kafka.common.config.ConfigException: Invalid value
org.apache.kafka.common.serialization.StringSerializer; for
configuration key.serializer: Class
org.apache.kafka.common.serialization.StringSerializer; could not be found.
我尝试将序列化配置更改为:
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer;");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
到
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer;");
配置文件和服务文件代码:
@Configuration
public class KafkaProducerConfig {
@Bean
private static ProducerFactory<String, String> producerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer;");
// The following properties are used by LiKafkaProducerImpl
props.put("large.message.enabled", "true");
props.put("max.message.segment.bytes", 1000 * 1024);
props.put("segment.serializer", DefaultSegmentSerializer.class.getName());
props.put("auditor.class", LoggingAuditor.class.getName());
return new DefaultKafkaProducerFactory(props);
}
}
@Service
public class KafkaSender {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic.name}")
private String topicName;
public void sendData(List<Student> student) {
System.out.println("Inside Student"+ student.toString());
System.out.println("Inside Student"+ student);
// TODO Auto-generated method stub
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaHeaders.TOPIC, topicName);
System.out.println("\nStudent= " + headers);
// Construct a JSONObject from a Map.
JSONObject HeaderObject = new JSONObject(headers);
System.out.println("\nUsing new JSONObject() ==> " + HeaderObject);
final String record = HeaderObject.toString();
final int recordSize = record.length();
kafkaTemplate.send(new GenericMessage<>(student, headers));
LOGGER.info("Data - " + student + " sent to Kafka Topic - " + topicName);
}
}
POST json:
[{ "studentId": "Q45678123", "firstName": "abc", "lastName": "xyz", "age": "12", "address":{ "apartment": "apt 123", "street": "street Info", "state": "state", "city": "city", "postCode": "12345" } }, { "studentId": "Q45678123", "firstName": "abc", "lastName": "xyz", "age": "12", "address":{ "apartment": "apt 123", "street": "street Info", "state": "state", "city": "city", "postCode": "12345" } } ]
您需要删除值末尾的分号
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
或者您可以像对段序列化程序那样使用 class.getName()
方法,我建议这样做更安全,因为它可以保证您想要的序列化程序在编译时可用