Spring Cloud Stream Kafka - EmbeddedHeadersMessageConverter - java.lang.StringIndexOutOfBoundsException: 字符串索引超出范围

Spring Cloud Stream Kafka - EmbeddedHeadersMessageConverter - java.lang.StringIndexOutOfBoundsException: String index out of range in

我有一个 Spring 启动应用程序 (app0),它使用 Spring Cloud Stream Kafka 来读取主题。

还有另外两个应用程序(app1app2)会针对该主题生成消息。

app1 使用接口 OrderSource:

发布消息
public interface OrderSource{ 

    String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS;

    @Output(OrderSource.OUTPUT_PAYMENT)
    MessageChannel output();

例如:

orderSource.output().send(MessageBuilder.withPayload(order).build(), 500);

在这种情况下,app0 可以毫无问题地读取来自 app1 的消息。

app2 使用 KafkaTemplate 发布消息:

ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }");
try {
    SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS);

在这种情况下,我观​​察到来自 EmbeddedHeadersMessageConverter 的以下异常:

java.lang.StringIndexOutOfBoundsException: String index out of range: 152
    at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91]
    at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]

显然它正试图从消息的有效负载中提取 headers。如何在支持两种消息源(KafkaTemplate 和 OrderSource)的同时防止发生此异常。

要与非 Spring-Cloud-Stream 应用通信,您需要将消费者上的 headerMode 配置为 raw

您还需要对 app1 的生产者执行相同的操作,这样他就不会嵌入 headers.

参见consumer properties and producer properties