spring-kafka serialize/deserialize org.springframework.messaging.Message objects 使用 Avro

spring-kafka serialize/deserialize org.springframework.messaging.Message objects using Avro

我想通过在它们上使用 Avro serdes 来发送和接收 spring 消息 objects。我正在使用 spring.framework.messaging.message 因为我想传递 headers 并修改它们。 我的消息类型是交易。

这是我的生产者配置

@Bean
public ProducerFactory<String, Transaction> transactionProducerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

    return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, Transaction> transactionKafkaTemplate() {
    return new KafkaTemplate<>(transactionProducerFactory());
}

这是我的消费者配置

@Bean
public ConsumerFactory<String, Transaction> transactionConsumerFactory() {

    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "something");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new AvroDeserializer(Message.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Transaction> transactionListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(transactionConsumerFactory());
    factory.getContainerProperties().setAckOnError(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    factory.setErrorHandler(new CustomErrorHandler());
    return factory;
}

我可以使用KafkaTemplate<String, Transaction> transactionsProducer;发送消息,我就是这样发送的

Transaction transaction = new Transaction();
        transaction.setAmount(100.00F);
        transaction.setTxnId("134567776433577");
        transaction.setTimestamp("2020-03-24-18:30:00.000");
Message<Transaction> message = MessageBuilder.withPayload(transaction).
setHeader("messageId", "m-asd-12435435").
setHeader(KafkaHeaders.TOPIC, "topic-1").
build();

kafkaTemplate.send(message);

但是在 KafkaListener 中我得到了这个错误

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data '[30, 49, 51, 52, 53, 54, 55, 55, 55, 54, 52, 51, 51, 53, 55, 55, -128, -6, -14, -14, -1, 1, 0, 0, -56, 66, -128, -102, -79, -12, -108, 3, 46, 50, 48, 50, 48, 45, 48, 51, 45, 50, 52, 45, 49, 56, 58, 51, 48, 58, 48, 48, 46, 48, 48, 48]' from topic 'transaction-request-upstream'
Caused by: java.lang.InstantiationException: org.springframework.messaging.Message
    at java.lang.Class.newInstance(Class.java:427) ~[na:1.8.0_191]
    at com.ibm.dip.evaluator.configuration.AvroDeserializer.deserialize(AvroDeserializer.java:46) ~[classes/:na]
    at com.ibm.dip.evaluator.configuration.AvroDeserializer.deserialize(AvroDeserializer.java:17) ~[classes/:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access00(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1034) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:990) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.NoSuchMethodException: org.springframework.messaging.Message.<init>()
    at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_191]
    at java.lang.Class.newInstance(Class.java:412) ~[na:1.8.0_191]
    ... 18 common frames omitted

请帮忙。

In the consumerFactory I am providing this return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new AvroDeserializer(Message.class)); Is this wrong?

是的,应该是Transformer.class,不是Message.class