Kafka 生产者 JSON 序列化
Kafka producer JSON serialization
我正在尝试使用 Spring Cloud Stream 与 Kafka 集成。正在写入的消息是一个 Java POJO,虽然它按预期工作(消息正在写入主题,我可以使用消费者应用程序读取),但在开头添加了一些未知字符在尝试集成 Kafka Connect 以接收来自主题的消息时导致问题的消息。
使用默认设置,这是推送到 Kafka 的消息:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}
如果我在 Java 应用程序中配置 Kafka 生产者,那么消息将被写入没有前导字符的主题 / headers:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<String, Object>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
关于 Kafka 的消息:
{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}
因为我只是设置 key/value 序列化程序,所以我希望能够在 application.yml
属性文件中执行此操作,而不是通过代码执行此操作。
但是,当更新 yml 以指定序列化程序时,它并没有像我预期的那样工作,即它没有生成与 Java(上文)中配置的生产者相同的消息:
spring:
profiles: local
cloud:
stream:
bindings:
session:
destination: session
contentType: application/json
kafka:
binder:
brokers: localhost
zkNodes: localhost
defaultZkPort: 2181
defaultBrokerPort: 9092
bindings:
session:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
关于 Kafka 的消息:
"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"
是否可以仅通过应用程序 yml 进行配置?是否缺少其他设置?
请参阅生产者属性 (....session.producer.useNativeEncoding
) 中的 headerMode
和 useNativeEncoding
。
headerMode
When set to raw, disables header embedding on output. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when producing data for non-Spring Cloud Stream applications.
Default: embeddedHeaders.
useNativeEncoding
When set to true, the outbound message is serialized directly by client library, which must be configured correspondingly (e.g. setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use appropriate decoder (ex: Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding/decoding is used the headerMode property is ignored and headers will not be embedded into the message.
Default: false.
以上答案归功于@Gary!
为了完整起见,下面是现在适用于我的配置。
spring:
profiles: local
cloud:
stream:
bindings:
session:
producer:
useNativeEncoding: true
destination: session
contentType: application/json
kafka:
binder:
brokers: localhost
zkNodes: localhost
defaultZkPort: 2181
defaultBrokerPort: 9092
bindings:
session:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
现在,spring.kafka.producer.value-serializer
属性可以使用了
yml:
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
属性:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
我正在尝试使用 Spring Cloud Stream 与 Kafka 集成。正在写入的消息是一个 Java POJO,虽然它按预期工作(消息正在写入主题,我可以使用消费者应用程序读取),但在开头添加了一些未知字符在尝试集成 Kafka Connect 以接收来自主题的消息时导致问题的消息。
使用默认设置,这是推送到 Kafka 的消息:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}
如果我在 Java 应用程序中配置 Kafka 生产者,那么消息将被写入没有前导字符的主题 / headers:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<String, Object>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
关于 Kafka 的消息:
{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}
因为我只是设置 key/value 序列化程序,所以我希望能够在 application.yml
属性文件中执行此操作,而不是通过代码执行此操作。
但是,当更新 yml 以指定序列化程序时,它并没有像我预期的那样工作,即它没有生成与 Java(上文)中配置的生产者相同的消息:
spring:
profiles: local
cloud:
stream:
bindings:
session:
destination: session
contentType: application/json
kafka:
binder:
brokers: localhost
zkNodes: localhost
defaultZkPort: 2181
defaultBrokerPort: 9092
bindings:
session:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
关于 Kafka 的消息:
"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"
是否可以仅通过应用程序 yml 进行配置?是否缺少其他设置?
请参阅生产者属性 (....session.producer.useNativeEncoding
) 中的 headerMode
和 useNativeEncoding
。
headerMode
When set to raw, disables header embedding on output. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when producing data for non-Spring Cloud Stream applications.
Default: embeddedHeaders.
useNativeEncoding
When set to true, the outbound message is serialized directly by client library, which must be configured correspondingly (e.g. setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use appropriate decoder (ex: Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding/decoding is used the headerMode property is ignored and headers will not be embedded into the message.
Default: false.
以上答案归功于@Gary!
为了完整起见,下面是现在适用于我的配置。
spring:
profiles: local
cloud:
stream:
bindings:
session:
producer:
useNativeEncoding: true
destination: session
contentType: application/json
kafka:
binder:
brokers: localhost
zkNodes: localhost
defaultZkPort: 2181
defaultBrokerPort: 9092
bindings:
session:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
现在,spring.kafka.producer.value-serializer
属性可以使用了
yml:
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
属性:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer