Kafka 生产者 JSON 序列化

Kafka producer JSON serialization

我正在尝试使用 Spring Cloud Stream 与 Kafka 集成。正在写入的消息是一个 Java POJO,虽然它按预期工作(消息正在写入主题,我可以使用消费者应用程序读取),但在开头添加了一些未知字符在尝试集成 Kafka Connect 以接收来自主题的消息时导致问题的消息。

使用默认设置,这是推送到 Kafka 的消息:

     contentType   "text/plain"originalContentType    "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}

如果我在 Java 应用程序中配置 Kafka 生产者,那么消息将被写入没有前导字符的主题 / headers:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092");
        configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);

        return new DefaultKafkaProducerFactory<String, Object>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

关于 Kafka 的消息:

{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}

因为我只是设置 key/value 序列化程序,所以我希望能够在 application.yml 属性文件中执行此操作,而不是通过代码执行此操作。 但是,当更新 yml 以指定序列化程序时,它并没有像我预期的那样工作,即它没有生成与 Java(上文)中配置的生产者相同的消息:

spring:
  profiles: local
  cloud:
    stream:
      bindings:
        session:
          destination: session
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          zkNodes: localhost
          defaultZkPort: 2181
          defaultBrokerPort: 9092
        bindings:
          session:
            producer:
              configuration:
                value:
                  serializer: org.springframework.kafka.support.serializer.JsonSerializer
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

关于 Kafka 的消息:

"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"

是否可以仅通过应用程序 yml 进行配置?是否缺少其他设置?

请参阅生产者属性 (....session.producer.useNativeEncoding) 中的 headerModeuseNativeEncoding

headerMode

When set to raw, disables header embedding on output. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when producing data for non-Spring Cloud Stream applications.

Default: embeddedHeaders.

useNativeEncoding

When set to true, the outbound message is serialized directly by client library, which must be configured correspondingly (e.g. setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use appropriate decoder (ex: Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding/decoding is used the headerMode property is ignored and headers will not be embedded into the message.

Default: false.

以上答案归功于@Gary!

为了完整起见,下面是现在适用于我的配置。

spring:
  profiles: local
  cloud:
    stream:
      bindings:
        session:
          producer:
            useNativeEncoding: true
          destination: session
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          zkNodes: localhost
          defaultZkPort: 2181
          defaultBrokerPort: 9092
        bindings:
          session:
            producer:
              configuration:
                value:
                  serializer: org.springframework.kafka.support.serializer.JsonSerializer
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

现在,spring.kafka.producer.value-serializer属性可以使用了

yml:

spring:
   kafka:
     producer:
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

属性:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer