使用事务时每个生产者不同 key.serializer

Different key.serializer per producer when using transactions

我正在使用 spring-cloud-stream-kafka,但我不明白如何在使用事务时更改 key.serializer 属性。这是我的配置:

spring:
  cloud:
    stream:
      bindings:
        accountSource:
          contentType: application/*+avro
          destination: account
      kafka:
        binder:
          brokers: ${KAFKA_BOOTSTRAP_ADDRESSES}
          transaction:
            transaction-id-prefix: tx-
            producer:
              configuration:
                retries: 1
                acks: all                
        bindings:
          accountSource:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer # Ignored!
      schema:
        avro:
          subjectNamingStrategy: com.example.CustomSubjectNamingStrategy
      schemaRegistryClient:
        endpoint: ${KAFKA_SCHEMA_REGISTRY_URL}

我读到 here 在绑定器级别设置事务属性时,所有其他特定绑定生产者属性。这是否意味着应用程序中的所有生产者都应该使用相同的 key.serializer?这对我来说似乎很严格。

问题是事务必须由消费者绑定启动(因此它可以将偏移量发送到事务并在 success/failure 之后提交或回滚)。

如果有多个生产者绑定,消费者绑定不知道应用程序将数据发送到哪一个,所以我们必须使用一个全局生产者。

一种解决方案是编写自定义委托序列化程序并在输出消息中设置 header 以告知序列化程序调用哪个委托序列化程序。