spring-kafka 序列化期间发生 MessageConversionException?
MessageConversionException during serialization in spring-kafka?
为了表达我的意思,我做了一个简单的项目。
依赖关系:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
这就是代码中的所有kafka配置:
@Configuration
public class KafkaSerializationConfig implements SmartInitializingSingleton {
private final KafkaProperties kafkaProperties;
public KafkaSerializationConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Override
public void afterSingletonsInstantiated() {
AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties());
List<NewTopic> newTopics = new ArrayList<>();
newTopics.add(new NewTopic("demo", 2, (short) 2));
client.createTopics(newTopics);
client.close();
}
private static ObjectMapper mapper = new ObjectMapper()
.registerModules(new Jdk8Module(), new JavaTimeModule());
public static class KafkaSerializer extends JsonSerializer<Object> {
public KafkaSerializer() {
super(mapper);
}
}
public static class KafkaDeserializer extends JsonDeserializer<Object> {
public KafkaDeserializer() {
super(mapper);
}
}
}
application.yml
文件:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: sample-cutter
auto-offset-reset: earliest
properties.spring.json.trusted.packages: "*"
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaSerializer
所以这个应用程序可以正常运行了。但是让我们尝试使用它。假设我们有两个服务:服务器和客户端。所以我们有两个 类 具有相同的字段:
客户的 DTO:
public class GettingUser {
private String firstName;
private String lastName;
public GettingUser() {
}
public GettingUser(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
服务器的 DTO:
public class SendingUser {
private String firstName;
private String lastName;
public SendingUser() {
}
public SendingUser(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
最后让我们尝试使用它:
@RestController
public class SpringSerializationDemoController {
private final KafkaTemplate<Long, SendingUser> template;
private Logger logger = LoggerFactory.getLogger(this.getClass());
public SpringSerializationDemoController(KafkaTemplate<Long, SendingUser> template) {
this.template = template;
}
@GetMapping("/start-demo")
public String startDemo() {
SendingUser user = new SendingUser("John", "Smith");
template.send("demo", user);
return "OK";
}
@KafkaListener(topics = "demo")
public void consumeCutSample(GettingUser user) {
logger.info("Got user: {}", user);
}
}
我可以看到显示我的应用无法将 SendingUser
转换为 GettingUser
的异常。
异常:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.springkafkasimpledemo.controller.SpringSerializationDemoController.consumeCutSample(com.example.springkafkasimpledemo.domain.GettingUser)]
Bean [com.example.springkafkasimpledemo.controller.SpringSerializationDemoController@53601a9c]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2037) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2025) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1924) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1851) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1748) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1472) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1135) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1992) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1974) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
... 13 common frames omitted
但它为什么要这样做呢?我的对象映射器不使用类型。我可以在我的控制台中看到它:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic demo
{"firstName":"John","lastName":"Smith"}
无法理解为什么 spring-kafka 尝试将 SendingUser 转换为 GettingUser。
如果您正在使用 Spring JSON(反)序列化器(两侧),您需要配置类型映射 - 请参阅 https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#json-serde
将源 class 映射到发送端的令牌,并将令牌映射到接收端所需的 class。
或者您可以在反序列化器中禁用 headers 并配置默认类型。参见 setUseTypeHeaders()
。
编辑
对于更复杂的类型,例如仿制药,你应该 configure the deserializer to call a method that returns a JavaType
.
例如List<Foo>
:
public static JavaType returnType(String topic, byte[] data, Headers headers) {
return TypeFactory.defaultInstance()
.constructCollectionLikeType(List.class, Foo.class);
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.Application.returnType
为您的 Event<SourceFile>
使用 TypeReference
或 constructParametricType
。参见 Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double
为了表达我的意思,我做了一个简单的项目。
依赖关系:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
这就是代码中的所有kafka配置:
@Configuration
public class KafkaSerializationConfig implements SmartInitializingSingleton {
private final KafkaProperties kafkaProperties;
public KafkaSerializationConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Override
public void afterSingletonsInstantiated() {
AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties());
List<NewTopic> newTopics = new ArrayList<>();
newTopics.add(new NewTopic("demo", 2, (short) 2));
client.createTopics(newTopics);
client.close();
}
private static ObjectMapper mapper = new ObjectMapper()
.registerModules(new Jdk8Module(), new JavaTimeModule());
public static class KafkaSerializer extends JsonSerializer<Object> {
public KafkaSerializer() {
super(mapper);
}
}
public static class KafkaDeserializer extends JsonDeserializer<Object> {
public KafkaDeserializer() {
super(mapper);
}
}
}
application.yml
文件:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: sample-cutter
auto-offset-reset: earliest
properties.spring.json.trusted.packages: "*"
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaSerializer
所以这个应用程序可以正常运行了。但是让我们尝试使用它。假设我们有两个服务:服务器和客户端。所以我们有两个 类 具有相同的字段:
客户的 DTO:
public class GettingUser {
private String firstName;
private String lastName;
public GettingUser() {
}
public GettingUser(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
服务器的 DTO:
public class SendingUser {
private String firstName;
private String lastName;
public SendingUser() {
}
public SendingUser(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
最后让我们尝试使用它:
@RestController
public class SpringSerializationDemoController {
private final KafkaTemplate<Long, SendingUser> template;
private Logger logger = LoggerFactory.getLogger(this.getClass());
public SpringSerializationDemoController(KafkaTemplate<Long, SendingUser> template) {
this.template = template;
}
@GetMapping("/start-demo")
public String startDemo() {
SendingUser user = new SendingUser("John", "Smith");
template.send("demo", user);
return "OK";
}
@KafkaListener(topics = "demo")
public void consumeCutSample(GettingUser user) {
logger.info("Got user: {}", user);
}
}
我可以看到显示我的应用无法将 SendingUser
转换为 GettingUser
的异常。
异常:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.springkafkasimpledemo.controller.SpringSerializationDemoController.consumeCutSample(com.example.springkafkasimpledemo.domain.GettingUser)]
Bean [com.example.springkafkasimpledemo.controller.SpringSerializationDemoController@53601a9c]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2037) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2025) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1924) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1851) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1748) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1472) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1135) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1992) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1974) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
... 13 common frames omitted
但它为什么要这样做呢?我的对象映射器不使用类型。我可以在我的控制台中看到它:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic demo
{"firstName":"John","lastName":"Smith"}
无法理解为什么 spring-kafka 尝试将 SendingUser 转换为 GettingUser。
如果您正在使用 Spring JSON(反)序列化器(两侧),您需要配置类型映射 - 请参阅 https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#json-serde
将源 class 映射到发送端的令牌,并将令牌映射到接收端所需的 class。
或者您可以在反序列化器中禁用 headers 并配置默认类型。参见 setUseTypeHeaders()
。
编辑
对于更复杂的类型,例如仿制药,你应该 configure the deserializer to call a method that returns a JavaType
.
例如List<Foo>
:
public static JavaType returnType(String topic, byte[] data, Headers headers) {
return TypeFactory.defaultInstance()
.constructCollectionLikeType(List.class, Foo.class);
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.Application.returnType
为您的 Event<SourceFile>
使用 TypeReference
或 constructParametricType
。参见 Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double