Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间不兼容的 Avro 消息
Incompatible Avro messages between Spring Cloud Stream Kafka Stream and native Kafka Stream applications and producers
可以在 https://github.com/codependent/event-carried-state-transfer/tree/avro
中找到验证这一点的示例应用程序
- kafka-xxx:本机应用程序
- spring-boot-xxx:Spring Cloud Stream 应用程序
问题是 本地 Kafka 生产者生成的 Avro 消息无法由 Spring 云流应用程序 解组,例如:
原生Kafka Producer(kafka-customer-service项目)
@Component
class CustomerProducer {
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ProducerConfig.CLIENT_ID_CONFIG] = "kafka-customer-producer"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = IntegerSerializer::class.java.name
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
}
Spring Cloud Stream Kafka Stream (spring-boot-shipping-service)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, ...): KStream<Int, OrderShippedEvent> {
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, false)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
...
在这种情况下,Spring Cloud Stream 应用程序解编一个空的客户 DTO:{"id": 0, "name": "", "address": "" }
现在尝试 反过来,一个 Spring Cloud Stream Producer 和一个本地 Kafka Streams 应用程序:
Spring Cloud Stream Kafka Producer (spring-boot-customer-service)
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
bindings:
output:
destination: customer
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
---
@Service
class CustomerServiceImpl(private val customerKafkaProducer: Source) : CustomerService {
...
val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.MESSAGE_KEY, customer.id).build()
customerKafkaProducer.output().send(message)
...
原生 Kafka 流(kafka-shipping-service)
val builder = StreamsBuilder()
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
//streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray()::class.java.name)
//streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java)
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
)
//val byteArraySerde = Serdes.ByteArray()
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, false)
val customerStream = builder.stream<Int, Customer>("customer",
Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable = customerStream
.map { key, value -> KeyValue(key, value) }
.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
在这种情况下,本机应用程序直接崩溃并出现异常(org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
)
Exception in thread "kafka-shipping-service-b89157ba-b21f-46ba-911d-97f6080d477e-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Disconnected from the target VM, address: '127.0.0.1:57856', transport: 'socket'
Process finished with exit code 0
如何确保 Spring Cloud Stream producer / Native Kafka producer 在异构的企业环境中生成的消息的兼容性,在这种环境中,消费者可能是模糊的 Spring Cloud Stream Katfka Stream应用程序和本地 Kafka 流?
@codependent 对于您的第一个案例 - 您有一个使用 KafkaAvroSerializer
和 Spring Cloud Stream Kafka Streams 消费者的本地 Kafka 生产者,该消费者使用 Spring Cloud Stream 提供的 avro 反序列化器。这将不起作用,因为您使用的是不兼容的 serializers/deserializers。为了解决这个问题,在 Spring Cloud Stream 端,您需要启用 useNativeDecoding
并提供 avro Serde's (SpecificAvroSerde
)。这样您就可以使用相同的 serialization/deserialization 策略。
对于第二种情况,当序列化程序不匹配时,您会遇到经典错误 (Unknown magic byte!
)。同样的问题。您有一个 Spring Cloud Stream 生产者,它使用框架中的序列化器,但在消费端使用 SpecificAvroSerde
。为了修复这里,您可以在生产者端打开 useNativeEncoding
并使用 avro 序列化程序。或者将 Spring Cloud Stream 中的 Avro 序列化程序包装在 Serde
中,并将其提供给消费者。
我认为这里的底线是,当使用 avro 作为数据交换格式时,您需要确保在依赖此数据的整个微服务链中使用相同的 serialization/deserialization 策略。
可以在 https://github.com/codependent/event-carried-state-transfer/tree/avro
中找到验证这一点的示例应用程序- kafka-xxx:本机应用程序
- spring-boot-xxx:Spring Cloud Stream 应用程序
问题是 本地 Kafka 生产者生成的 Avro 消息无法由 Spring 云流应用程序 解组,例如:
原生Kafka Producer(kafka-customer-service项目)
@Component
class CustomerProducer {
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ProducerConfig.CLIENT_ID_CONFIG] = "kafka-customer-producer"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = IntegerSerializer::class.java.name
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
}
Spring Cloud Stream Kafka Stream (spring-boot-shipping-service)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, ...): KStream<Int, OrderShippedEvent> {
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, false)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
...
在这种情况下,Spring Cloud Stream 应用程序解编一个空的客户 DTO:{"id": 0, "name": "", "address": "" }
现在尝试 反过来,一个 Spring Cloud Stream Producer 和一个本地 Kafka Streams 应用程序:
Spring Cloud Stream Kafka Producer (spring-boot-customer-service)
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
bindings:
output:
destination: customer
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
---
@Service
class CustomerServiceImpl(private val customerKafkaProducer: Source) : CustomerService {
...
val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.MESSAGE_KEY, customer.id).build()
customerKafkaProducer.output().send(message)
...
原生 Kafka 流(kafka-shipping-service)
val builder = StreamsBuilder()
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
//streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray()::class.java.name)
//streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java)
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
)
//val byteArraySerde = Serdes.ByteArray()
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, false)
val customerStream = builder.stream<Int, Customer>("customer",
Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable = customerStream
.map { key, value -> KeyValue(key, value) }
.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
在这种情况下,本机应用程序直接崩溃并出现异常(org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
)
Exception in thread "kafka-shipping-service-b89157ba-b21f-46ba-911d-97f6080d477e-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Disconnected from the target VM, address: '127.0.0.1:57856', transport: 'socket'
Process finished with exit code 0
如何确保 Spring Cloud Stream producer / Native Kafka producer 在异构的企业环境中生成的消息的兼容性,在这种环境中,消费者可能是模糊的 Spring Cloud Stream Katfka Stream应用程序和本地 Kafka 流?
@codependent 对于您的第一个案例 - 您有一个使用 KafkaAvroSerializer
和 Spring Cloud Stream Kafka Streams 消费者的本地 Kafka 生产者,该消费者使用 Spring Cloud Stream 提供的 avro 反序列化器。这将不起作用,因为您使用的是不兼容的 serializers/deserializers。为了解决这个问题,在 Spring Cloud Stream 端,您需要启用 useNativeDecoding
并提供 avro Serde's (SpecificAvroSerde
)。这样您就可以使用相同的 serialization/deserialization 策略。
对于第二种情况,当序列化程序不匹配时,您会遇到经典错误 (Unknown magic byte!
)。同样的问题。您有一个 Spring Cloud Stream 生产者,它使用框架中的序列化器,但在消费端使用 SpecificAvroSerde
。为了修复这里,您可以在生产者端打开 useNativeEncoding
并使用 avro 序列化程序。或者将 Spring Cloud Stream 中的 Avro 序列化程序包装在 Serde
中,并将其提供给消费者。
我认为这里的底线是,当使用 avro 作为数据交换格式时,您需要确保在依赖此数据的整个微服务链中使用相同的 serialization/deserialization 策略。