Spring Cloud Stream Kafka 应用程序未使用正确的 Avro 架构生成消息
Spring Cloud Stream Kafka application not generating messages with the correct Avro schema
我有一个带有 KStream 的应用程序 (spring-boot-shipping-service),它获取由外部生产者 (spring-boot-order-service) 生成的 OrderCreatedEvent 消息。该生产者使用以下架构:
order-created-event.avsc
{
"namespace" : "com.codependent.statetransfer.order",
"type" : "record",
"name" : "OrderCreatedEvent",
"fields" : [
{"name":"id","type":"int"},
{"name":"productId","type":"int"},
{"name":"customerId","type":"int"}
]
}
我的 KStream<Int, OrderCreatedEvent>
加入了 KTable<Int, Customer>
并向订单主题发布了一种新消息:OrderShippedEvent。
order-shipped-event.avsc
{
"namespace" : "com.codependent.statetransfer.order",
"type" : "record",
"name" : "OrderShippedEvent",
"fields" : [
{"name":"id","type":"int"},
{"name":"productId","type":"int"},
{"name":"customerName","type":"string"},
{"name":"customerAddress","type":"string"}
]
}
出于某种原因,新的 OrderShippedEvent 消息不是使用 header application/vnd.ordershippedevent.v1+avro
而是 application/vnd.ordercreatedevent.v1+avro
.
生成的
这是订单主题中的原始 OrderCreatedEvent:
Key (4 bytes): +
Value (4 bytes): V?
Timestamp: 1555943926163
Partition: 0
Offset: 34
Headers: contentType="application/vnd.ordercreatedevent.v1+avro",spring_json_header_types={"contentType":"java.lang.String"}
生成的 OrderShippedEvent 架构不正确:
Key (4 bytes): +
Value (26 bytes): V?
JamesHill Street
Timestamp: 1555943926163
Partition: 0
Offset: 35
Headers: contentType="application/vnd.ordercreatedevent.v1+avro",spring_json_header_types={"contentType":"java.lang.String"}
我检查了 Confluent Schema Registry 的内容,那里有 order-shipped-event.avsc 模式:
为什么在生成的消息中没有使用正确的模式?
您可以在下面看到示例的完整配置和代码,也可以在 Github (https://github.com/codependent/event-carried-state-transfer/tree/avro)
上找到
为了测试它,只需启动一个 Confluent Platform (v5.2.1),spring-boot-customer-service、spring-boot-order-service、spring-boot-shipping-service 并执行以下 curl 命令:
curl -X POST http://localhost:8080/customers -d '{"id":1,"name":"James","address":"Hill Street"}' -H "content-type: application/json"
curl -X POST http://localhost:8084/orders -H "content-type: application/json" -d '{"id":1,"productId":1001,"/customerId":1}'
application.yml
server:
port: 8085
spring:
application:
name: spring-boot-shipping-service
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
bindings:
input:
destination: customer
contentType: application/*+avro
order:
destination: order
contentType: application/*+avro
output:
destination: order
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
ShippingKStreamProcessor
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, OrderCreatedEvent>
@Output("output")
fun output(): KStream<String, OrderShippedEvent>
ShippingKStreamConfiguration
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderCreatedEvent>): KStream<Int, OrderShippedEvent> {
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, true)
val orderCreatedSerde = SpecificAvroSerde<OrderCreatedEvent>()
orderCreatedSerde.configure(serdeConfig, true)
val orderShippedSerde = SpecificAvroSerde<OrderShippedEvent>()
orderShippedSerde.configure(serdeConfig, true)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent && value.id != 0 }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
}
更新: 我已经为 org.springframework.messaging 设置了跟踪日志记录级别,显然它看起来不错:
2019-04-22 23:40:39.953 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP GET http://localhost:8081/subjects/ordercreatedevent/versions/1
2019-04-22 23:40:39.971 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:39.972 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [] as "application/vnd.schemaregistry.v1+json"
2019-04-22 23:40:39.984 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:39.985 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.186 INFO 46039 --- [read-1-producer] org.apache.kafka.clients.Metadata : Cluster ID: 5Sw6sBD0TFOaximF3Or-dQ
2019-04-22 23:40:40.318 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Obtaining schema for class class com.codependent.statetransfer.order.OrderShippedEvent
2019-04-22 23:40:40.318 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Avro type detected, using schema from object
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP POST http://localhost:8081/subjects/ordershippedevent/versions
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [{"schema":"{\"type\":\"record\",\"name\":\"OrderShippedEvent\",\"namespace\":\"com.codependent.statetransfer.order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"productId\",\"type\":\"int\"},{\"name\":\"customerName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"customerAddress\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"}] as "application/json"
2019-04-22 23:40:40.348 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:40.348 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP POST http://localhost:8081/subjects/ordershippedevent
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [{"schema":"{\"type\":\"record\",\"name\":\"OrderShippedEvent\",\"namespace\":\"com.codependent.statetransfer.order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"productId\",\"type\":\"int\"},{\"name\":\"customerName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"customerAddress\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"}] as "application/json"
2019-04-22 23:40:40.361 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:40.362 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.362 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Finding correct DatumWriter for type com.codependent.statetransfer.order.OrderShippedEvent
为什么邮件内容类型写错了header?
更新 2:
我一直在深入研究源代码,发现了这个:
KafkaStreamsMessageConversionDelegate
正确转换并确定正确的 header 值,如上面的日志所示。
但是在serializeOnOutbound方法中我们可以发现它returns给Kafka的API只有payload,所以未考虑 header:
return
messageConverter.toMessage(message.getPayload(),
messageHeaders).getPayload();
- 在记录处理中向前移动
org.apache.kafka.streams.processor.internals.SinkNode.process()
访问上下文中存在的 headers,它错误地包含 application/vnd.ordercreatedevent.v1+avro
而不是 application/vnd.ordershippedevent.v1+avro
(?):
collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
更新 3:
重现步骤:
下载并启动 Confluent 5.2.1
confluent start
启动应用程序 spring-boot-order-service、spring-boot-customer-service、spring-boot-shipping-service
创建客户curl -X POST http://localhost:8080/customers -d '{"id":1,"name":"John","address":"Some Street"}' -H "content-type: application/json"
创建将与客户合并的订单:curl -X POST http://localhost:8084/orders -H "content-type: application/json" -d '{"id":1,"productId":1,"customerId":1}'
ShippingKStreamConfiguration 的 process()
将为客户创建一个 KTable 和一个状态存储 (customer-store)。此外,它将与客户KTable一起加入订单流,将OrderCreatedEvent转换为OrderShippedEvent。
您可以检查添加到订单主题的新创建的OrderShippedEvent消息是否有错误header。这可以在 Confluent Control Center (localhost:9092 -> topics -> order
) 或 运行 kafkacat:
中看到
$> kafkacat -b localhost:9092 -t order -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
@codependent 这确实是我们需要在活页夹中解决的问题,我们将很快修复。与此同时,作为一种解决方法,您可以让您的处理器不是 return 而是 KStream
,而是在方法本身中发送。您可以在当前 returned KStream
上调用 to(TopicNameExtractor)
。 TopicNameExtractor
将使您能够访问记录上下文,您可以使用它手动设置内容类型。
我有一个带有 KStream 的应用程序 (spring-boot-shipping-service),它获取由外部生产者 (spring-boot-order-service) 生成的 OrderCreatedEvent 消息。该生产者使用以下架构:
order-created-event.avsc
{
"namespace" : "com.codependent.statetransfer.order",
"type" : "record",
"name" : "OrderCreatedEvent",
"fields" : [
{"name":"id","type":"int"},
{"name":"productId","type":"int"},
{"name":"customerId","type":"int"}
]
}
我的 KStream<Int, OrderCreatedEvent>
加入了 KTable<Int, Customer>
并向订单主题发布了一种新消息:OrderShippedEvent。
order-shipped-event.avsc
{
"namespace" : "com.codependent.statetransfer.order",
"type" : "record",
"name" : "OrderShippedEvent",
"fields" : [
{"name":"id","type":"int"},
{"name":"productId","type":"int"},
{"name":"customerName","type":"string"},
{"name":"customerAddress","type":"string"}
]
}
出于某种原因,新的 OrderShippedEvent 消息不是使用 header application/vnd.ordershippedevent.v1+avro
而是 application/vnd.ordercreatedevent.v1+avro
.
这是订单主题中的原始 OrderCreatedEvent:
Key (4 bytes): +
Value (4 bytes): V?
Timestamp: 1555943926163
Partition: 0
Offset: 34
Headers: contentType="application/vnd.ordercreatedevent.v1+avro",spring_json_header_types={"contentType":"java.lang.String"}
生成的 OrderShippedEvent 架构不正确:
Key (4 bytes): +
Value (26 bytes): V?
JamesHill Street
Timestamp: 1555943926163
Partition: 0
Offset: 35
Headers: contentType="application/vnd.ordercreatedevent.v1+avro",spring_json_header_types={"contentType":"java.lang.String"}
我检查了 Confluent Schema Registry 的内容,那里有 order-shipped-event.avsc 模式:
为什么在生成的消息中没有使用正确的模式?
您可以在下面看到示例的完整配置和代码,也可以在 Github (https://github.com/codependent/event-carried-state-transfer/tree/avro)
上找到为了测试它,只需启动一个 Confluent Platform (v5.2.1),spring-boot-customer-service、spring-boot-order-service、spring-boot-shipping-service 并执行以下 curl 命令:
curl -X POST http://localhost:8080/customers -d '{"id":1,"name":"James","address":"Hill Street"}' -H "content-type: application/json"
curl -X POST http://localhost:8084/orders -H "content-type: application/json" -d '{"id":1,"productId":1001,"/customerId":1}'
application.yml
server:
port: 8085
spring:
application:
name: spring-boot-shipping-service
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
bindings:
input:
destination: customer
contentType: application/*+avro
order:
destination: order
contentType: application/*+avro
output:
destination: order
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
ShippingKStreamProcessor
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, OrderCreatedEvent>
@Output("output")
fun output(): KStream<String, OrderShippedEvent>
ShippingKStreamConfiguration
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderCreatedEvent>): KStream<Int, OrderShippedEvent> {
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, true)
val orderCreatedSerde = SpecificAvroSerde<OrderCreatedEvent>()
orderCreatedSerde.configure(serdeConfig, true)
val orderShippedSerde = SpecificAvroSerde<OrderShippedEvent>()
orderShippedSerde.configure(serdeConfig, true)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent && value.id != 0 }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
}
更新: 我已经为 org.springframework.messaging 设置了跟踪日志记录级别,显然它看起来不错:
2019-04-22 23:40:39.953 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP GET http://localhost:8081/subjects/ordercreatedevent/versions/1
2019-04-22 23:40:39.971 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:39.972 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [] as "application/vnd.schemaregistry.v1+json"
2019-04-22 23:40:39.984 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:39.985 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.186 INFO 46039 --- [read-1-producer] org.apache.kafka.clients.Metadata : Cluster ID: 5Sw6sBD0TFOaximF3Or-dQ
2019-04-22 23:40:40.318 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Obtaining schema for class class com.codependent.statetransfer.order.OrderShippedEvent
2019-04-22 23:40:40.318 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Avro type detected, using schema from object
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP POST http://localhost:8081/subjects/ordershippedevent/versions
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [{"schema":"{\"type\":\"record\",\"name\":\"OrderShippedEvent\",\"namespace\":\"com.codependent.statetransfer.order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"productId\",\"type\":\"int\"},{\"name\":\"customerName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"customerAddress\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"}] as "application/json"
2019-04-22 23:40:40.348 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:40.348 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP POST http://localhost:8081/subjects/ordershippedevent
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [{"schema":"{\"type\":\"record\",\"name\":\"OrderShippedEvent\",\"namespace\":\"com.codependent.statetransfer.order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"productId\",\"type\":\"int\"},{\"name\":\"customerName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"customerAddress\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"}] as "application/json"
2019-04-22 23:40:40.361 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:40.362 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.362 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Finding correct DatumWriter for type com.codependent.statetransfer.order.OrderShippedEvent
为什么邮件内容类型写错了header?
更新 2:
我一直在深入研究源代码,发现了这个:
KafkaStreamsMessageConversionDelegate
正确转换并确定正确的 header 值,如上面的日志所示。但是在serializeOnOutbound方法中我们可以发现它returns给Kafka的API只有payload,所以未考虑 header:
return
messageConverter.toMessage(message.getPayload(),
messageHeaders).getPayload();
- 在记录处理中向前移动
org.apache.kafka.streams.processor.internals.SinkNode.process()
访问上下文中存在的 headers,它错误地包含application/vnd.ordercreatedevent.v1+avro
而不是application/vnd.ordershippedevent.v1+avro
(?):
collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
更新 3:
重现步骤:
下载并启动 Confluent 5.2.1
confluent start
启动应用程序 spring-boot-order-service、spring-boot-customer-service、spring-boot-shipping-service
创建客户
curl -X POST http://localhost:8080/customers -d '{"id":1,"name":"John","address":"Some Street"}' -H "content-type: application/json"
创建将与客户合并的订单:
curl -X POST http://localhost:8084/orders -H "content-type: application/json" -d '{"id":1,"productId":1,"customerId":1}'
ShippingKStreamConfiguration 的
process()
将为客户创建一个 KTable 和一个状态存储 (customer-store)。此外,它将与客户KTable一起加入订单流,将OrderCreatedEvent转换为OrderShippedEvent。您可以检查添加到订单主题的新创建的OrderShippedEvent消息是否有错误header。这可以在 Confluent Control Center (
localhost:9092 -> topics -> order
) 或 运行 kafkacat: 中看到
$> kafkacat -b localhost:9092 -t order -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
@codependent 这确实是我们需要在活页夹中解决的问题,我们将很快修复。与此同时,作为一种解决方法,您可以让您的处理器不是 return 而是 KStream
,而是在方法本身中发送。您可以在当前 returned KStream
上调用 to(TopicNameExtractor)
。 TopicNameExtractor
将使您能够访问记录上下文,您可以使用它手动设置内容类型。