获取 java.lang.ClassCastException: [B cannot be cast to org.springframework.messaging.Message exception after consuming batch
Getting java.lang.ClassCastException: [B cannot be cast to org.springframework.messaging.Message exception after consuming batch
我正在使用spring-cloud-stream-kafka-binder-3.0.4批量消费消息,消费后,将Message转换为对象,但出现上述异常。
代码如下:
@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
@Header(name = "deliveryAttempt", defaultValue = "1") int deliveryAttempt,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)
{
try
{
log.info("Received activity message with message length {} attempt {}",
messages.size(), deliveryAttempt);
List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
nodeConfigActivityBatchProcessor.processNodeConfigActivity(eventList);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!", messages.size());
}
catch (Exception e)
{
throw e;
}
}
配置:
spring.cloud.stream.bindings.activity-input-channel.destination=TOPIC.FEED.NAME
spring.cloud.stream.bindings.activity-input-channel.contentType=application/json
spring.cloud.stream.bindings.activity-input-channel.consumer.batch-mode=true
spring.cloud.stream.bindings.activity-input-channel.consumer.max-attempts=1
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.auto-commit-offset=false
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.reset-offsets=true
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.start-offset=latest
spring.kafka.consumer.max-poll-records=5
spring.kafka.consumer.fetch-max-wait=60000
spring.kafka.consumer.fetch-min-size=500
我在 .collect(Collectors.toList()) 的这一行 List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
收到上述错误。我不明白为什么??
如果我检查 Message<Event> eventMessage = messages.get(0)
得到相同的异常(消息是消息变量的列表)。
if batch mode if false 那么它只消费单个消息 handleActivity(Message message),那么它工作正常,没有异常。
使用batch模式需要加解串器吗???
我通过添加反序列化器设法解决了这个异常。
所以下面是我的批处理监听器,而不是像问题中提到的那样消耗 List<Message<Event>> messages
,而是消耗 List<Event> messages
.
@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Event> messages,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)
{
try
{
log.info("Received activity message with message length {} attempt
{}",messages.size(), deliveryAttempt);
nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!",
messages.size());
}
catch (Exception e)
{
throw e;
}
}
添加了以下解串器
public class EventDeserializer extends JsonDeserializer<Event> {
}
在 value.deserializer 属性 下方添加到属性文件。
spring.cloud.stream.kafka.bindings.input-channel.consumer.configuration.value.deserializer=com.sample.messaging.util.EventDeserializer
我在批处理监听器中得到了事件列表。
不幸的是,它在任何地方都没有提到:-(,但是将输入事件包装到org.springframework.messaging.Message
中(例如获取记录密钥),这是批处理时正确的签名:
@Bean
public Consumer<Message<List<String>>> sink() {
return messages -> {
List<?> keys = messages.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class);
log.info("headers {}", messages.getHeaders());
keys.forEach(k -> log.info("key {}", k));
messages.getPayload().forEach(string -> log.info("processing {}", string));
};
}
您注意到签名是相反的:Message<List<?>>
而不是 List<Message<?>>
!
我正在使用spring-cloud-stream-kafka-binder-3.0.4批量消费消息,消费后,将Message转换为对象,但出现上述异常。
代码如下:
@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
@Header(name = "deliveryAttempt", defaultValue = "1") int deliveryAttempt,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)
{
try
{
log.info("Received activity message with message length {} attempt {}",
messages.size(), deliveryAttempt);
List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
nodeConfigActivityBatchProcessor.processNodeConfigActivity(eventList);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!", messages.size());
}
catch (Exception e)
{
throw e;
}
}
配置:
spring.cloud.stream.bindings.activity-input-channel.destination=TOPIC.FEED.NAME
spring.cloud.stream.bindings.activity-input-channel.contentType=application/json
spring.cloud.stream.bindings.activity-input-channel.consumer.batch-mode=true
spring.cloud.stream.bindings.activity-input-channel.consumer.max-attempts=1
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.auto-commit-offset=false
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.reset-offsets=true
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.start-offset=latest
spring.kafka.consumer.max-poll-records=5
spring.kafka.consumer.fetch-max-wait=60000
spring.kafka.consumer.fetch-min-size=500
我在 .collect(Collectors.toList()) 的这一行 List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
收到上述错误。我不明白为什么??
如果我检查 Message<Event> eventMessage = messages.get(0)
得到相同的异常(消息是消息变量的列表)。
if batch mode if false 那么它只消费单个消息 handleActivity(Message message),那么它工作正常,没有异常。
使用batch模式需要加解串器吗???
我通过添加反序列化器设法解决了这个异常。
所以下面是我的批处理监听器,而不是像问题中提到的那样消耗 List<Message<Event>> messages
,而是消耗 List<Event> messages
.
@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Event> messages,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)
{
try
{
log.info("Received activity message with message length {} attempt
{}",messages.size(), deliveryAttempt);
nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!",
messages.size());
}
catch (Exception e)
{
throw e;
}
}
添加了以下解串器
public class EventDeserializer extends JsonDeserializer<Event> {
}
在 value.deserializer 属性 下方添加到属性文件。
spring.cloud.stream.kafka.bindings.input-channel.consumer.configuration.value.deserializer=com.sample.messaging.util.EventDeserializer
我在批处理监听器中得到了事件列表。
不幸的是,它在任何地方都没有提到:-(,但是将输入事件包装到org.springframework.messaging.Message
中(例如获取记录密钥),这是批处理时正确的签名:
@Bean
public Consumer<Message<List<String>>> sink() {
return messages -> {
List<?> keys = messages.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class);
log.info("headers {}", messages.getHeaders());
keys.forEach(k -> log.info("key {}", k));
messages.getPayload().forEach(string -> log.info("processing {}", string));
};
}
您注意到签名是相反的:Message<List<?>>
而不是 List<Message<?>>
!