问题序列化 Spring 批处理 Kafka ChunkRequest
Problem Serializing Spring batch Kafka ChunkRequest
我正在编写示例 Spring 批处理 Kafka 集成,带有远程分块。
一开始,master 在一些 Chunk 中读取了一些样本记录(Item.java)。然后将这个块发送到(Spring 集成)通道,Kafka 生产者将这个块发送到 Kafka 主题。
问题是 KafkaTemplate 无法序列化 ChenkRequest。
如果我使用:
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
然后出现此错误:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.batch.integration.chunk.ChunkRequest to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
如果我编写自定义序列化程序,例如:
public class CustomSerializer implements Serializer<ChunkRequest<Item>> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, ChunkRequest<Item> data) {
try {
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
和这个配置:
spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer
此错误会引发:
com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (WhosebugError) (through reference chain: org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]
所以,问题是什么!?
org:spring-projects org.springframework.batch.integration.chunk.ChunkRequest
不是 JSON-compatible。考虑编写一个 Serializer<ChunkRequest<Item>>
,它将使用标准 Java 序列化功能进行序列化。您可以使用 Spring 中的 org.springframework.core.serializer.DefaultSerializer.serializeToByteArray()
作为代表。
可能你在另一边需要一个类似的反序列化器...
谢谢阿尔乔姆。
成功了!
我写了一个客户序列化程序:
public class CustomSerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
try {
return new DefaultSerializer().serializeToByteArray(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
}
配置:
spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer
和自定义反序列化器:
public class CustomDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String s, byte[] bytes) {
try {
return new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
配置:
spring.kafka.consumer.value-deserializer=com.example.myremotechunking.batch.CustomDeserializer
我正在编写示例 Spring 批处理 Kafka 集成,带有远程分块。 一开始,master 在一些 Chunk 中读取了一些样本记录(Item.java)。然后将这个块发送到(Spring 集成)通道,Kafka 生产者将这个块发送到 Kafka 主题。 问题是 KafkaTemplate 无法序列化 ChenkRequest。
如果我使用:
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
然后出现此错误:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.batch.integration.chunk.ChunkRequest to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
如果我编写自定义序列化程序,例如:
public class CustomSerializer implements Serializer<ChunkRequest<Item>> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, ChunkRequest<Item> data) {
try {
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
和这个配置:
spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer
此错误会引发:
com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (WhosebugError) (through reference chain: org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]
所以,问题是什么!?
org:spring-projects org.springframework.batch.integration.chunk.ChunkRequest
不是 JSON-compatible。考虑编写一个 Serializer<ChunkRequest<Item>>
,它将使用标准 Java 序列化功能进行序列化。您可以使用 Spring 中的 org.springframework.core.serializer.DefaultSerializer.serializeToByteArray()
作为代表。
可能你在另一边需要一个类似的反序列化器...
谢谢阿尔乔姆。 成功了! 我写了一个客户序列化程序:
public class CustomSerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
try {
return new DefaultSerializer().serializeToByteArray(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
}
配置:
spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer
和自定义反序列化器:
public class CustomDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String s, byte[] bytes) {
try {
return new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
配置:
spring.kafka.consumer.value-deserializer=com.example.myremotechunking.batch.CustomDeserializer