如何在kafka中创建自定义序列化器?

How to create Custom serializer in kafka?

只有少数可用的序列化器,如

org.apache.kafka.common.serialization.StringSerializer

我们如何创建自己的自定义序列化程序?

您必须创建自己的实现接口 Serializer (org.apache.kafka.common.serialization.Serializer) 的序列化程序,然后为其设置生产者选项 key.serializer / value.serializer

这里有一个使用您自己的 serializer/deserializer 作为 Kafka 消息值的示例。对于 Kafka 消息密钥是一样的。

我们想发送 MyMessage 的序列化版本作为 Kafka 值,并在消费者端再次将其反序列化为 MyMessage 对象。

在生产者端序列化 MyMessage。

您应该创建一个实现 org.apache.kafka.common.serialization.Serializer

的序列化程序 class

serialize() 方法完成工作,接收您的对象并将序列化版本作为字节数组返回。

public class MyValueSerializer implements Serializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public byte[] serialize(String topic, MyMessage message)
    {
        if (message == null) {
            return null;
        }

        try {

            (serialize your MyMessage object into bytes)

            return bytes;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

在消费者端反序列化 MyMessage。

您应该创建一个实现 org.apache.kafka.common.serialization.Deserializer

的反序列化器 class

deserialize() 方法完成工作,接收序列化值作为字节数组并返回您的对象。

public class MyValueDeserializer implements Deserializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public MyMessage deserialize(String s, byte[] value)
    {
        if (value == null) {
            return null;
        }

        try {

            (deserialize value into your MyMessage object)

            MyMessage message = new MyMessage();
            return message;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

然后像这样使用它:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {

    int kafkaKey = record.key();
    MyMessage kafkaValue = record.value();

    ...
}

没有文字,只有代码

  1. 你发送给 Kafka 的一些对象

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class TestDto {
    
        private String name;
        private String version;
    
    }
    
  2. 创建Serializer,Producer会用到

    @Slf4j
    public class KafkaValueSerializer implements Serializer<TestDto> {
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public byte[] serialize(String topic, TestDto data) {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (JsonProcessingException e) {
                log.error("Unable to serialize object {}", data, e);
                return null;
            }
        }
    
        @Override
        public void close() {
        }
    }
    
  3. 当然,不要忘记消费者的反序列化器

    @Slf4j
    public class KafkaValueDeserializer implements Deserializer<TestDto> {
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public TestDto deserialize(String topic, byte[] data) {
            try {
                return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class);
            } catch (Exception e) {
                log.error("Unable to deserialize message {}", data, e);
                return null;
            }
        }
    
        @Override
        public void close() {
        }
    }
    
  4. 最后一刻,加serializer/deserializer到application.yml

    spring:
        kafka:
          bootstrap-servers:  192.168.192.168:9092
          producer:
              value-serializer: com.package.service.kafka.KafkaValueSerializer
          consumer:
              group-id: groupId
              value-deserializer: com.package.service.kafka.KafkaValueDeserializer
    

就是这样。不需要任何配置文件或手鼓跳舞:)

  1. 发送

    KafkaTemplate<String, TestDto> kafkaTemplate;
    
    TestDto test = new TestDto("test name", "test-version");
    kafkaTemplate.send(topic, testDto);
    
  2. @KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void listen(TestDto message) {
    
        log.info("Received message '{}' from Kafka.", message.toString());
    }