使用事务时每个生产者不同 key.serializer
Different key.serializer per producer when using transactions
我正在使用 spring-cloud-stream-kafka,但我不明白如何在使用事务时更改 key.serializer
属性。这是我的配置:
spring:
cloud:
stream:
bindings:
accountSource:
contentType: application/*+avro
destination: account
kafka:
binder:
brokers: ${KAFKA_BOOTSTRAP_ADDRESSES}
transaction:
transaction-id-prefix: tx-
producer:
configuration:
retries: 1
acks: all
bindings:
accountSource:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer # Ignored!
schema:
avro:
subjectNamingStrategy: com.example.CustomSubjectNamingStrategy
schemaRegistryClient:
endpoint: ${KAFKA_SCHEMA_REGISTRY_URL}
我读到 here 在绑定器级别设置事务属性时,所有其他特定绑定生产者属性。这是否意味着应用程序中的所有生产者都应该使用相同的 key.serializer
?这对我来说似乎很严格。
问题是事务必须由消费者绑定启动(因此它可以将偏移量发送到事务并在 success/failure 之后提交或回滚)。
如果有多个生产者绑定,消费者绑定不知道应用程序将数据发送到哪一个,所以我们必须使用一个全局生产者。
一种解决方案是编写自定义委托序列化程序并在输出消息中设置 header 以告知序列化程序调用哪个委托序列化程序。
我正在使用 spring-cloud-stream-kafka,但我不明白如何在使用事务时更改 key.serializer
属性。这是我的配置:
spring:
cloud:
stream:
bindings:
accountSource:
contentType: application/*+avro
destination: account
kafka:
binder:
brokers: ${KAFKA_BOOTSTRAP_ADDRESSES}
transaction:
transaction-id-prefix: tx-
producer:
configuration:
retries: 1
acks: all
bindings:
accountSource:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer # Ignored!
schema:
avro:
subjectNamingStrategy: com.example.CustomSubjectNamingStrategy
schemaRegistryClient:
endpoint: ${KAFKA_SCHEMA_REGISTRY_URL}
我读到 here 在绑定器级别设置事务属性时,所有其他特定绑定生产者属性。这是否意味着应用程序中的所有生产者都应该使用相同的 key.serializer
?这对我来说似乎很严格。
问题是事务必须由消费者绑定启动(因此它可以将偏移量发送到事务并在 success/failure 之后提交或回滚)。
如果有多个生产者绑定,消费者绑定不知道应用程序将数据发送到哪一个,所以我们必须使用一个全局生产者。
一种解决方案是编写自定义委托序列化程序并在输出消息中设置 header 以告知序列化程序调用哪个委托序列化程序。