Spring 集成 Kafka - 发送基本字符串
Spring Integration Kafka - Sending a basic String
我正在尝试使用 Spring Integration Kafka v1.2.1 发送基本字符串负载,但失败并出现以下异常:
2015-09-03 11:50:39.729 ERROR 14418 --- [task-executor-3] [ ] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=11=]0(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
我的配置 XML 如下所示:
<task:executor id="schedule-request-task-executor" pool-size="5" keep-alive="120" queue-capacity="125"/>
<bean id="kafkaStringSerializer" class="org.apache.kafka.common.serialization.StringSerializer"/>
<int-kafka:producer-context id="schedule-request-producer-context">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration topic="schedule.requests"
key-serializer="kafkaStringSerializer"
value-serializer="kafkaStringSerializer"
broker-list="${kafka.brokers}"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<int-kafka:outbound-channel-adapter
kafka-producer-context-ref="schedule-request-producer-context"
channel="schedule-request-channel">
<int:poller receive-timeout="0"
fixed-delay="100" time-unit="MILLISECONDS"
task-executor="schedule-request-task-executor"/>
</int-kafka:outbound-channel-adapter>
我将使用以下代码发送消息:
Message message = MessageBuilder.withPayload("PAYLOAD")
.setHeader("messageKey", "KEY")
.setHeader("topic", "schedule.requests")
.build();
scheduleRequestChannel.send(message);
我查看了 https://github.com/spring-projects/spring-integration-extensions/blob/master/samples/ 上的样本,但这些样本似乎已经过时了。
在调试 SI 和 Kafka 类 后,我发现会发生这种情况,因为 Spring Integration 会将 String 转换为 byte[],除非 key-class-type
和 value-class-type
是在生产者配置中指定。
如果有人感兴趣,这里是更新后的配置。
<int-kafka:producer-context id="schedule-request-producer-context">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration topic="schedule.requests"
key-class-type="java.lang.String"
key-serializer="kafkaStringSerializer"
value-class-type="java.lang.String"
value-serializer="kafkaStringSerializer"
broker-list="${kafka.brokers}"/>
我正在尝试使用 Spring Integration Kafka v1.2.1 发送基本字符串负载,但失败并出现以下异常:
2015-09-03 11:50:39.729 ERROR 14418 --- [task-executor-3] [ ] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=11=]0(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
我的配置 XML 如下所示:
<task:executor id="schedule-request-task-executor" pool-size="5" keep-alive="120" queue-capacity="125"/>
<bean id="kafkaStringSerializer" class="org.apache.kafka.common.serialization.StringSerializer"/>
<int-kafka:producer-context id="schedule-request-producer-context">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration topic="schedule.requests"
key-serializer="kafkaStringSerializer"
value-serializer="kafkaStringSerializer"
broker-list="${kafka.brokers}"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<int-kafka:outbound-channel-adapter
kafka-producer-context-ref="schedule-request-producer-context"
channel="schedule-request-channel">
<int:poller receive-timeout="0"
fixed-delay="100" time-unit="MILLISECONDS"
task-executor="schedule-request-task-executor"/>
</int-kafka:outbound-channel-adapter>
我将使用以下代码发送消息:
Message message = MessageBuilder.withPayload("PAYLOAD")
.setHeader("messageKey", "KEY")
.setHeader("topic", "schedule.requests")
.build();
scheduleRequestChannel.send(message);
我查看了 https://github.com/spring-projects/spring-integration-extensions/blob/master/samples/ 上的样本,但这些样本似乎已经过时了。
在调试 SI 和 Kafka 类 后,我发现会发生这种情况,因为 Spring Integration 会将 String 转换为 byte[],除非 key-class-type
和 value-class-type
是在生产者配置中指定。
如果有人感兴趣,这里是更新后的配置。
<int-kafka:producer-context id="schedule-request-producer-context">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration topic="schedule.requests"
key-class-type="java.lang.String"
key-serializer="kafkaStringSerializer"
value-class-type="java.lang.String"
value-serializer="kafkaStringSerializer"
broker-list="${kafka.brokers}"/>