尝试序列化 Avro SpecificRecords 列表并使用 @Splitter 拆分它们时出现 MessageConversionException
MessageConversionException when trying to serialize a List of Avro SpecificRecords and split them with @Splitter
我有一个带有 RabbitMQ 绑定器的 Spring Cloud Stream(Elmhurst 版本)拆分器。我正在尝试更新它,以便它将 Avro 模式用于有效负载。我遇到了一个转换异常,这使得 Avro 转换器看起来没有被调用,并且 JSON 转换器获取消息并在其上运行。
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])
我已经确认我可以使用生成的 Avro class(maven-avro 插件)创建 objects 并序列化它们 to-disk,所以这部分看起来是正确的。我已将项目从 Groovy 转换为 Java,但仍然出现相同的错误,所以我认为这也被排除了。
这是 Avro 架构:
{
"namespace": "com.example.avro",
"type": "record",
"name": "SkinnyMessage",
"doc": "Light message for passing references to Message objects",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "guid",
"type": "string"
}
]
}
和class的相关部分:
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
class PagingQueryProcessorApplication {
@Timed(value = 'paging.query')
@Splitter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
List<SkinnyMessage> queryExecutor(def trigger){
log.debug 'Building query'
def query = queryConfiguration.buildQuery().toUriString()
log.info "Executing query: ${query}"
def response = service.getRecordings(query)
log.info "Returning response collection: ${response.body.content.size()}"
// We build a slim notification on each of the query responses
def skinnyMessages = response.body.content.collect{
new SkinnyMessage(it.getLink('self').getHref(), it.content.guid)
}
skinnyMessages
}
...
}
编辑:当我单步执行调试器时,我可以看到 AvroSchemaRegistryClientMessageConverter 调用 canConvertTo(payload, headers)
失败,因为 headers 中的 mimeType
是 application/json
并且不是 application/*+avro
,所以它会继续尝试转换器链的其余部分。
如果我构建一条消息并在其上设置一个 avro content-type header,这似乎可行,但这似乎是一个 hack。
List<Message<SkinnyMessage>> skinnyMessages = response.body.content.collect{
MessageBuilder.withPayload(
new SkinnyMessage(it.getLink('self').getHref(), it.content.recordingGuid))
.setHeader('contentType', 'application/*+avro')
.build()
}
创建在 RabbitMQ 中看起来正确的消息 UI:
contentType: application/vnd.skinnymessage.v1+avro
correlationId: f8be74d6-f780-efcc-295d-338a8b7f2ea0
content_type: application/octet-stream
Payload
96 bytes
Encoding: string
thttps://example.com/message/2597061H9a688e40-3e30-4b17-80e9-cf4f897e8a91
如果我对文档的理解正确,那么这应该从 application.yml 中的设置透明地发生:(如 Schema Registry Samples):
spring:
cloud:
stream:
bindings:
output:
contentType: application/*+avro
我有一个带有 RabbitMQ 绑定器的 Spring Cloud Stream(Elmhurst 版本)拆分器。我正在尝试更新它,以便它将 Avro 模式用于有效负载。我遇到了一个转换异常,这使得 Avro 转换器看起来没有被调用,并且 JSON 转换器获取消息并在其上运行。
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])
我已经确认我可以使用生成的 Avro class(maven-avro 插件)创建 objects 并序列化它们 to-disk,所以这部分看起来是正确的。我已将项目从 Groovy 转换为 Java,但仍然出现相同的错误,所以我认为这也被排除了。
这是 Avro 架构:
{
"namespace": "com.example.avro",
"type": "record",
"name": "SkinnyMessage",
"doc": "Light message for passing references to Message objects",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "guid",
"type": "string"
}
]
}
和class的相关部分:
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
class PagingQueryProcessorApplication {
@Timed(value = 'paging.query')
@Splitter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
List<SkinnyMessage> queryExecutor(def trigger){
log.debug 'Building query'
def query = queryConfiguration.buildQuery().toUriString()
log.info "Executing query: ${query}"
def response = service.getRecordings(query)
log.info "Returning response collection: ${response.body.content.size()}"
// We build a slim notification on each of the query responses
def skinnyMessages = response.body.content.collect{
new SkinnyMessage(it.getLink('self').getHref(), it.content.guid)
}
skinnyMessages
}
...
}
编辑:当我单步执行调试器时,我可以看到 AvroSchemaRegistryClientMessageConverter 调用 canConvertTo(payload, headers)
失败,因为 headers 中的 mimeType
是 application/json
并且不是 application/*+avro
,所以它会继续尝试转换器链的其余部分。
如果我构建一条消息并在其上设置一个 avro content-type header,这似乎可行,但这似乎是一个 hack。
List<Message<SkinnyMessage>> skinnyMessages = response.body.content.collect{
MessageBuilder.withPayload(
new SkinnyMessage(it.getLink('self').getHref(), it.content.recordingGuid))
.setHeader('contentType', 'application/*+avro')
.build()
}
创建在 RabbitMQ 中看起来正确的消息 UI:
contentType: application/vnd.skinnymessage.v1+avro correlationId: f8be74d6-f780-efcc-295d-338a8b7f2ea0 content_type: application/octet-stream Payload 96 bytes Encoding: string
thttps://example.com/message/2597061H9a688e40-3e30-4b17-80e9-cf4f897e8a91
如果我对文档的理解正确,那么这应该从 application.yml 中的设置透明地发生:(如 Schema Registry Samples):
spring:
cloud:
stream:
bindings:
output:
contentType: application/*+avro