尝试序列化 Avro SpecificRecords 列表并使用 @Splitter 拆分它们时出现 MessageConversionException

MessageConversionException when trying to serialize a List of Avro SpecificRecords and split them with @Splitter

我有一个带有 RabbitMQ 绑定器的 Spring Cloud Stream(Elmhurst 版本)拆分器。我正在尝试更新它,以便它将 Avro 模式用于有效负载。我遇到了一个转换异常,这使得 Avro 转换器看起来没有被调用,并且 JSON 转换器获取消息并在其上运行。

Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])

我已经确认我可以使用生成的 Avro class(maven-avro 插件)创建 objects 并序列化它们 to-disk,所以这部分看起来是正确的。我已将项目从 Groovy 转换为 Java,但仍然出现相同的错误,所以我认为这也被排除了。

这是 Avro 架构:

{
  "namespace": "com.example.avro",
  "type": "record",
  "name": "SkinnyMessage",
  "doc": "Light message for passing references to Message objects",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "guid",
      "type": "string"
    }
  ]
}

和class的相关部分:

@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
class PagingQueryProcessorApplication {
    @Timed(value = 'paging.query')
    @Splitter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    List<SkinnyMessage> queryExecutor(def trigger){
        log.debug 'Building query'
        def query = queryConfiguration.buildQuery().toUriString()

        log.info "Executing query: ${query}"
        def response = service.getRecordings(query)

        log.info "Returning response collection: ${response.body.content.size()}"
        // We build a slim notification on each of the query responses
        def skinnyMessages = response.body.content.collect{
            new SkinnyMessage(it.getLink('self').getHref(), it.content.guid)
        }
        skinnyMessages
    }
...
}

编辑:当我单步执行调试器时,我可以看到 AvroSchemaRegistryClientMessageConverter 调用 canConvertTo(payload, headers) 失败,因为 headers 中的 mimeTypeapplication/json 并且不是 application/*+avro,所以它会继续尝试转换器链的其余部分。

如果我构建一条消息并在其上设置一个 avro content-type header,这似乎可行,但这似乎是一个 hack。

List<Message<SkinnyMessage>> skinnyMessages = response.body.content.collect{
    MessageBuilder.withPayload(
            new SkinnyMessage(it.getLink('self').getHref(), it.content.recordingGuid))
            .setHeader('contentType', 'application/*+avro')
            .build()
}

创建在 RabbitMQ 中看起来正确的消息 UI:

contentType: application/vnd.skinnymessage.v1+avro correlationId: f8be74d6-f780-efcc-295d-338a8b7f2ea0 content_type: application/octet-stream Payload 96 bytes Encoding: string

thttps://example.com/message/2597061H9a688e40-3e30-4b17-80e9-cf4f897e8a91

如果我对文档的理解正确,那么这应该从 application.yml 中的设置透明地发生:(如 Schema Registry Samples):

spring: cloud: stream: bindings: output: contentType: application/*+avro