无法将 Kafka Avro 消息发送到消息通道 <Failed to convert Generic Message to Outbound Message>

Unable to send Kafka Avro Message to Message Channel <Failed to convert Generic Message to Outbound Message>

我们正在尝试通过将 Avro 架构消息发送到消息通道来将 Kafka 通知推送到外部 Kafka 主题。

向频道发送消息时,出现以下异常:

    Failed to send Message to channel 'DemoChannel'; 
        nested exception is java.lang.IllegalStateException: 
            Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]' to outbound message.
            org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'DemoChannel'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]
            at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)

以下是 Spring 引导服务的 application.yml 文件中 Kafka 主题和消息通道的配置。

    cloud:
        stream:
          bindings:
            DemoChannel:
              destination: demoTest
              content-type: avro/bytes  
          kafka:
           binder:
            replication-factor: 1
            brokers: ${broker-ip-and-port}
            zkNodes: ${zookeeper-ip-and-port}
            autoCreateTopics: false
            zkConnectionTimeout: 36000

下面是消息频道class文件:

             import org.springframework.cloud.stream.annotation.Output;
             import org.springframework.messaging.MessageChannel;
    
             public interface CustomDemoChannel {
                  @Output("DemoChannel")
                  MessageChannel customDemoChannel();
             }

下面是尝试将 Avro 消息发送到消息通道的生产者代码

             //initialized by the autowired CustomDemoChannel variable
             MessageChannel messageChannel ; 
             
             //DemoChannel is the Avro Generated class file based on the Avro schema file
             //avroSchemaObject is constructed and initialized by the inner Builder class of the Avro generated DemoChannel class
             DemoChannel avroSchemaObject; 
             
             //Message to be published is built with payload
             Message<DemoChannel> message = MessageBuilder.withPayload(avroSchemaObject).build();
             
             //Sending the message to the message channel
             messageChannel.send(message);

如何解决这个异常?

谢谢索比!

通过进行额外的更改解决了这个问题:即定义一个 AvroSchemaConverter 并将其作为资源包含在现有的 Kafka Producer class。

第 1 步:定义 AvroSchemaConverter

@Configuration
public class AvroNoSchemaRegistryConfiguration {
    public static final String CONVERTER_DEMO = "DemoConverter";
    @Bean(name = CONVERTER_DEMO)
    public AvroSchemaMessageConverter demoConverter() {
        AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
        return converter;
    }
}

第 2 步:使用 AvroSchemaConverter 作为生产者 class 文件中的资源,如下所示:

@Resource(name = AvroNoSchemaRegistryConfiguration.CONVERTER_DEMO)
private AvroSchemaMessageConverter converterDemo;

资源将由 Spring Cloud Stream jar 自动装配,并且消息在运行时被转换为 Avro 格式,从而允许消息发布到主题。