spring-cloud-stream 消息转换异常
spring-cloud-stream message conversion exception
将我们的一项服务升级到 spring-cloud-stream
2.0.0.RC3 时,我们在尝试使用使用旧版本 spring-cloud-stream
的服务生成的消息时遇到异常 - Ditmars.RELEASE:
ERROR 31241 --- [container-4-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.watercorp.messaging.types.incoming.UsersDeletedMessage] for GenericMessage [payload=byte[371], headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d, kafka_timestampType=CREATE_TIME, message_id=1645508761, id=f4e947de-22e6-b629-229b-4fa961c73f2d, type=USERS_DELETED, kafka_receivedPartitionId=4, contentType=text/plain, kafka_receivedTopic=user, kafka_receivedTimestamp=1521641760698, timestamp=1521641772477}], failedMessage=GenericMessage [payload=byte[371], headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d, kafka_timestampType=CREATE_TIME, message_id=1645508761, id=f4e947de-22e6-b629-229b-4fa961c73f2d, type=USERS_DELETED, kafka_receivedPartitionId=4, contentType=text/plain, kafka_receivedTopic=user, kafka_receivedTimestamp=1521641760698, timestamp=1521641772477}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164)
at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:70)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
看起来原因是随消息发送的 contentType
header 是 text/plain
尽管它应该是 application/json
。
生产者配置:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka
defaultBrokerPort: 9092
zkNodes: zookeeper
defaultZkPort: 2181
minPartitionCount: 2
replicationFactor: 1
autoCreateTopics: true
autoAddPartitions: true
headers: type,message_id
requiredAcks: 1
configuration:
"[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
bindings:
user-output:
producer:
sync: true
configuration:
retries: 10000
default:
binder: kafka
contentType: application/json
group: user-service
consumer:
maxAttempts: 1
producer:
partitionKeyExtractorClass: com.watercorp.user_service.messaging.PartitionKeyExtractor
bindings:
user-output:
destination: user
producer:
partitionCount: 5
消费者配置:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka
defaultBrokerPort: 9092
minPartitionCount: 2
replicationFactor: 1
autoCreateTopics: true
autoAddPartitions: true
headers: type,message_id
requiredAcks: 1
configuration:
"[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
bindings:
user-input:
consumer:
autoRebalanceEnabled: true
autoCommitOnError: true
enableDlq: true
default:
binder: kafka
contentType: application/json
group: enrollment-service
consumer:
maxAttempts: 1
headerMode: embeddedHeaders
producer:
partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
headerMode: embeddedHeaders
bindings:
user-input:
destination: user
consumer:
concurrency: 5
partitioned: true
消费者@StreamListener:
@StreamListener(target = UserInput.INPUT, condition = "headers['type']=='" + USERS_DELETED + "'")
public void handleUsersDeletedMessage(@Valid UsersDeletedMessage usersDeletedMessage, @Header(value = "kafka_receivedPartitionId",
required = false) String partitionId, @Header(value = KAFKA_TOPIC_HEADER_NAME, required = false) String topic, @Header(MESSAGE_ID_HEADER_NAME) String messageId) throws Throwable {
logger.info(String.format("Received users deleted message message, message id: %s topic: %s partition: %s", messageId, topic, partitionId));
handleMessageWithRetry(_usersDeletedMessageHandler, usersDeletedMessage, messageId, topic);
}
这是 RC3 中的一个错误; recently fixed on master;它将在本月底发布的 GA 版本中发布。同时,你可以试试 2.0.0.BUILD-SNAPSHOT?
我能够重现该问题并使用快照为我修复了它...
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
编辑
为了完整性:
Ditmars 制作人
@SpringBootApplication
@EnableBinding(Source.class)
public class So49409104Application {
public static void main(String[] args) {
SpringApplication.run(So49409104Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
Foo foo = new Foo();
foo.setBar("bar");
output.send(new GenericMessage<>(foo));
};
}
public static class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
和
spring:
cloud:
stream:
bindings:
output:
destination: so49409104a
content-type: application/json
producer:
header-mode: embeddedHeaders
埃尔姆赫斯特消费者:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So494091041Application {
public static void main(String[] args) {
SpringApplication.run(So494091041Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(Foo foo) {
System.out.println(foo);
}
public static class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
和
spring:
cloud:
stream:
bindings:
input:
group: so49409104
destination: so49409104a
consumer:
header-mode: embeddedHeaders
content-type: application/json
结果:
Foo [bar=bar]
header-mode
是必需的,因为 2.0 中的默认值是 native
,现在 Kafka 原生支持 headers。
将我们的一项服务升级到 spring-cloud-stream
2.0.0.RC3 时,我们在尝试使用使用旧版本 spring-cloud-stream
的服务生成的消息时遇到异常 - Ditmars.RELEASE:
ERROR 31241 --- [container-4-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.watercorp.messaging.types.incoming.UsersDeletedMessage] for GenericMessage [payload=byte[371], headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d, kafka_timestampType=CREATE_TIME, message_id=1645508761, id=f4e947de-22e6-b629-229b-4fa961c73f2d, type=USERS_DELETED, kafka_receivedPartitionId=4, contentType=text/plain, kafka_receivedTopic=user, kafka_receivedTimestamp=1521641760698, timestamp=1521641772477}], failedMessage=GenericMessage [payload=byte[371], headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d, kafka_timestampType=CREATE_TIME, message_id=1645508761, id=f4e947de-22e6-b629-229b-4fa961c73f2d, type=USERS_DELETED, kafka_receivedPartitionId=4, contentType=text/plain, kafka_receivedTopic=user, kafka_receivedTimestamp=1521641760698, timestamp=1521641772477}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:70) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745)
看起来原因是随消息发送的 contentType
header 是 text/plain
尽管它应该是 application/json
。
生产者配置:
spring: cloud: stream: kafka: binder: brokers: kafka defaultBrokerPort: 9092 zkNodes: zookeeper defaultZkPort: 2181 minPartitionCount: 2 replicationFactor: 1 autoCreateTopics: true autoAddPartitions: true headers: type,message_id requiredAcks: 1 configuration: "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol bindings: user-output: producer: sync: true configuration: retries: 10000 default: binder: kafka contentType: application/json group: user-service consumer: maxAttempts: 1 producer: partitionKeyExtractorClass: com.watercorp.user_service.messaging.PartitionKeyExtractor bindings: user-output: destination: user producer: partitionCount: 5消费者配置:
spring: cloud: stream: kafka: binder: brokers: kafka defaultBrokerPort: 9092 minPartitionCount: 2 replicationFactor: 1 autoCreateTopics: true autoAddPartitions: true headers: type,message_id requiredAcks: 1 configuration: "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol bindings: user-input: consumer: autoRebalanceEnabled: true autoCommitOnError: true enableDlq: true default: binder: kafka contentType: application/json group: enrollment-service consumer: maxAttempts: 1 headerMode: embeddedHeaders producer: partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor headerMode: embeddedHeaders bindings: user-input: destination: user consumer: concurrency: 5 partitioned: true
消费者@StreamListener:
@StreamListener(target = UserInput.INPUT, condition = "headers['type']=='" + USERS_DELETED + "'") public void handleUsersDeletedMessage(@Valid UsersDeletedMessage usersDeletedMessage, @Header(value = "kafka_receivedPartitionId", required = false) String partitionId, @Header(value = KAFKA_TOPIC_HEADER_NAME, required = false) String topic, @Header(MESSAGE_ID_HEADER_NAME) String messageId) throws Throwable { logger.info(String.format("Received users deleted message message, message id: %s topic: %s partition: %s", messageId, topic, partitionId)); handleMessageWithRetry(_usersDeletedMessageHandler, usersDeletedMessage, messageId, topic); }
这是 RC3 中的一个错误; recently fixed on master;它将在本月底发布的 GA 版本中发布。同时,你可以试试 2.0.0.BUILD-SNAPSHOT?
我能够重现该问题并使用快照为我修复了它...
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
编辑
为了完整性:
Ditmars 制作人
@SpringBootApplication
@EnableBinding(Source.class)
public class So49409104Application {
public static void main(String[] args) {
SpringApplication.run(So49409104Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
Foo foo = new Foo();
foo.setBar("bar");
output.send(new GenericMessage<>(foo));
};
}
public static class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
和
spring:
cloud:
stream:
bindings:
output:
destination: so49409104a
content-type: application/json
producer:
header-mode: embeddedHeaders
埃尔姆赫斯特消费者:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So494091041Application {
public static void main(String[] args) {
SpringApplication.run(So494091041Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(Foo foo) {
System.out.println(foo);
}
public static class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
和
spring:
cloud:
stream:
bindings:
input:
group: so49409104
destination: so49409104a
consumer:
header-mode: embeddedHeaders
content-type: application/json
结果:
Foo [bar=bar]
header-mode
是必需的,因为 2.0 中的默认值是 native
,现在 Kafka 原生支持 headers。