问题序列化 Spring 批处理 Kafka ChunkRequest

Problem Serializing Spring batch Kafka ChunkRequest

我正在编写示例 Spring 批处理 Kafka 集成,带有远程分块。 一开始,master 在一些 Chunk 中读取了一些样本记录(Item.java)。然后将这个块发送到(Spring 集成)通道,Kafka 生产者将这个块发送到 Kafka 主题。 问题是 KafkaTemplate 无法序列化 ChenkRequest。

如果我使用:

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

然后出现此错误:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.batch.integration.chunk.ChunkRequest to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

如果我编写自定义序列化程序,例如:

public class CustomSerializer implements Serializer<ChunkRequest<Item>> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, ChunkRequest<Item> data) {
        try {
            System.out.println("Serializing...");
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }
}

和这个配置:

spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer

此错误会引发:

com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (WhosebugError) (through reference chain: org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]

所以,问题是什么!?

org:spring-projects org.springframework.batch.integration.chunk.ChunkRequest 不是 JSON-compatible。考虑编写一个 Serializer<ChunkRequest<Item>> ,它将使用标准 Java 序列化功能进行序列化。您可以使用 Spring 中的 org.springframework.core.serializer.DefaultSerializer.serializeToByteArray() 作为代表。

可能你在另一边需要一个类似的反序列化器...

谢谢阿尔乔姆。 成功了! 我写了一个客户序列化程序:

public class CustomSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        try {
            return new DefaultSerializer().serializeToByteArray(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }
}

配置:

spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer

和自定义反序列化器:

public class CustomDeserializer implements Deserializer<Object> {
    @Override
    public Object deserialize(String s, byte[] bytes) {
        try {
            return new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
}

配置:

spring.kafka.consumer.value-deserializer=com.example.myremotechunking.batch.CustomDeserializer