spring-kafka serialize/deserialize org.springframework.messaging.Message objects 使用 Avro
spring-kafka serialize/deserialize org.springframework.messaging.Message objects using Avro
我想通过在它们上使用 Avro serdes 来发送和接收 spring 消息 objects。我正在使用 spring.framework.messaging.message 因为我想传递 headers 并修改它们。
我的消息类型是交易。
这是我的生产者配置
@Bean
public ProducerFactory<String, Transaction> transactionProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Transaction> transactionKafkaTemplate() {
return new KafkaTemplate<>(transactionProducerFactory());
}
这是我的消费者配置
@Bean
public ConsumerFactory<String, Transaction> transactionConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "something");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new AvroDeserializer(Message.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Transaction> transactionListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(transactionConsumerFactory());
factory.getContainerProperties().setAckOnError(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setErrorHandler(new CustomErrorHandler());
return factory;
}
我可以使用KafkaTemplate<String, Transaction> transactionsProducer;
发送消息,我就是这样发送的
Transaction transaction = new Transaction();
transaction.setAmount(100.00F);
transaction.setTxnId("134567776433577");
transaction.setTimestamp("2020-03-24-18:30:00.000");
Message<Transaction> message = MessageBuilder.withPayload(transaction).
setHeader("messageId", "m-asd-12435435").
setHeader(KafkaHeaders.TOPIC, "topic-1").
build();
kafkaTemplate.send(message);
但是在 KafkaListener 中我得到了这个错误
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data '[30, 49, 51, 52, 53, 54, 55, 55, 55, 54, 52, 51, 51, 53, 55, 55, -128, -6, -14, -14, -1, 1, 0, 0, -56, 66, -128, -102, -79, -12, -108, 3, 46, 50, 48, 50, 48, 45, 48, 51, 45, 50, 52, 45, 49, 56, 58, 51, 48, 58, 48, 48, 46, 48, 48, 48]' from topic 'transaction-request-upstream'
Caused by: java.lang.InstantiationException: org.springframework.messaging.Message
at java.lang.Class.newInstance(Class.java:427) ~[na:1.8.0_191]
at com.ibm.dip.evaluator.configuration.AvroDeserializer.deserialize(AvroDeserializer.java:46) ~[classes/:na]
at com.ibm.dip.evaluator.configuration.AvroDeserializer.deserialize(AvroDeserializer.java:17) ~[classes/:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access00(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1034) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:990) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.NoSuchMethodException: org.springframework.messaging.Message.<init>()
at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_191]
at java.lang.Class.newInstance(Class.java:412) ~[na:1.8.0_191]
... 18 common frames omitted
请帮忙。
In the consumerFactory I am providing this return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new AvroDeserializer(Message.class));
Is this wrong?
是的,应该是Transformer.class
,不是Message.class
。
我想通过在它们上使用 Avro serdes 来发送和接收 spring 消息 objects。我正在使用 spring.framework.messaging.message 因为我想传递 headers 并修改它们。 我的消息类型是交易。
这是我的生产者配置
@Bean
public ProducerFactory<String, Transaction> transactionProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Transaction> transactionKafkaTemplate() {
return new KafkaTemplate<>(transactionProducerFactory());
}
这是我的消费者配置
@Bean
public ConsumerFactory<String, Transaction> transactionConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "something");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new AvroDeserializer(Message.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Transaction> transactionListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(transactionConsumerFactory());
factory.getContainerProperties().setAckOnError(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setErrorHandler(new CustomErrorHandler());
return factory;
}
我可以使用KafkaTemplate<String, Transaction> transactionsProducer;
发送消息,我就是这样发送的
Transaction transaction = new Transaction();
transaction.setAmount(100.00F);
transaction.setTxnId("134567776433577");
transaction.setTimestamp("2020-03-24-18:30:00.000");
Message<Transaction> message = MessageBuilder.withPayload(transaction).
setHeader("messageId", "m-asd-12435435").
setHeader(KafkaHeaders.TOPIC, "topic-1").
build();
kafkaTemplate.send(message);
但是在 KafkaListener 中我得到了这个错误
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data '[30, 49, 51, 52, 53, 54, 55, 55, 55, 54, 52, 51, 51, 53, 55, 55, -128, -6, -14, -14, -1, 1, 0, 0, -56, 66, -128, -102, -79, -12, -108, 3, 46, 50, 48, 50, 48, 45, 48, 51, 45, 50, 52, 45, 49, 56, 58, 51, 48, 58, 48, 48, 46, 48, 48, 48]' from topic 'transaction-request-upstream'
Caused by: java.lang.InstantiationException: org.springframework.messaging.Message
at java.lang.Class.newInstance(Class.java:427) ~[na:1.8.0_191]
at com.ibm.dip.evaluator.configuration.AvroDeserializer.deserialize(AvroDeserializer.java:46) ~[classes/:na]
at com.ibm.dip.evaluator.configuration.AvroDeserializer.deserialize(AvroDeserializer.java:17) ~[classes/:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access00(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1034) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:990) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.NoSuchMethodException: org.springframework.messaging.Message.<init>()
at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_191]
at java.lang.Class.newInstance(Class.java:412) ~[na:1.8.0_191]
... 18 common frames omitted
请帮忙。
In the consumerFactory I am providing this
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new AvroDeserializer(Message.class));
Is this wrong?
是的,应该是Transformer.class
,不是Message.class
。