无法将 Kafka Avro 消息发送到消息通道 <Failed to convert Generic Message to Outbound Message>
Unable to send Kafka Avro Message to Message Channel <Failed to convert Generic Message to Outbound Message>
我们正在尝试通过将 Avro 架构消息发送到消息通道来将 Kafka 通知推送到外部 Kafka 主题。
向频道发送消息时,出现以下异常:
Failed to send Message to channel 'DemoChannel';
nested exception is java.lang.IllegalStateException:
Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]' to outbound message.
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'DemoChannel'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
以下是 Spring 引导服务的 application.yml 文件中 Kafka 主题和消息通道的配置。
cloud:
stream:
bindings:
DemoChannel:
destination: demoTest
content-type: avro/bytes
kafka:
binder:
replication-factor: 1
brokers: ${broker-ip-and-port}
zkNodes: ${zookeeper-ip-and-port}
autoCreateTopics: false
zkConnectionTimeout: 36000
下面是消息频道class文件:
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface CustomDemoChannel {
@Output("DemoChannel")
MessageChannel customDemoChannel();
}
下面是尝试将 Avro 消息发送到消息通道的生产者代码
//initialized by the autowired CustomDemoChannel variable
MessageChannel messageChannel ;
//DemoChannel is the Avro Generated class file based on the Avro schema file
//avroSchemaObject is constructed and initialized by the inner Builder class of the Avro generated DemoChannel class
DemoChannel avroSchemaObject;
//Message to be published is built with payload
Message<DemoChannel> message = MessageBuilder.withPayload(avroSchemaObject).build();
//Sending the message to the message channel
messageChannel.send(message);
如何解决这个异常?
谢谢索比!
通过进行额外的更改解决了这个问题:即定义一个 AvroSchemaConverter 并将其作为资源包含在现有的 Kafka Producer class。
第 1 步:定义 AvroSchemaConverter
@Configuration
public class AvroNoSchemaRegistryConfiguration {
public static final String CONVERTER_DEMO = "DemoConverter";
@Bean(name = CONVERTER_DEMO)
public AvroSchemaMessageConverter demoConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
return converter;
}
}
第 2 步:使用 AvroSchemaConverter 作为生产者 class 文件中的资源,如下所示:
@Resource(name = AvroNoSchemaRegistryConfiguration.CONVERTER_DEMO)
private AvroSchemaMessageConverter converterDemo;
资源将由 Spring Cloud Stream jar 自动装配,并且消息在运行时被转换为 Avro 格式,从而允许消息发布到主题。
我们正在尝试通过将 Avro 架构消息发送到消息通道来将 Kafka 通知推送到外部 Kafka 主题。
向频道发送消息时,出现以下异常:
Failed to send Message to channel 'DemoChannel';
nested exception is java.lang.IllegalStateException:
Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]' to outbound message.
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'DemoChannel'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
以下是 Spring 引导服务的 application.yml 文件中 Kafka 主题和消息通道的配置。
cloud:
stream:
bindings:
DemoChannel:
destination: demoTest
content-type: avro/bytes
kafka:
binder:
replication-factor: 1
brokers: ${broker-ip-and-port}
zkNodes: ${zookeeper-ip-and-port}
autoCreateTopics: false
zkConnectionTimeout: 36000
下面是消息频道class文件:
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface CustomDemoChannel {
@Output("DemoChannel")
MessageChannel customDemoChannel();
}
下面是尝试将 Avro 消息发送到消息通道的生产者代码
//initialized by the autowired CustomDemoChannel variable
MessageChannel messageChannel ;
//DemoChannel is the Avro Generated class file based on the Avro schema file
//avroSchemaObject is constructed and initialized by the inner Builder class of the Avro generated DemoChannel class
DemoChannel avroSchemaObject;
//Message to be published is built with payload
Message<DemoChannel> message = MessageBuilder.withPayload(avroSchemaObject).build();
//Sending the message to the message channel
messageChannel.send(message);
如何解决这个异常?
谢谢索比!
通过进行额外的更改解决了这个问题:即定义一个 AvroSchemaConverter 并将其作为资源包含在现有的 Kafka Producer class。
第 1 步:定义 AvroSchemaConverter
@Configuration
public class AvroNoSchemaRegistryConfiguration {
public static final String CONVERTER_DEMO = "DemoConverter";
@Bean(name = CONVERTER_DEMO)
public AvroSchemaMessageConverter demoConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
return converter;
}
}
第 2 步:使用 AvroSchemaConverter 作为生产者 class 文件中的资源,如下所示:
@Resource(name = AvroNoSchemaRegistryConfiguration.CONVERTER_DEMO)
private AvroSchemaMessageConverter converterDemo;
资源将由 Spring Cloud Stream jar 自动装配,并且消息在运行时被转换为 Avro 格式,从而允许消息发布到主题。