写入主题时缺少密钥
Missing key when writing to topic
使用 Java Kafka 流 API。我想 select 为我正在使用的主题创建一个新密钥。新密钥将是不同的类型。当我使用控制台消费者使用新密钥消费主题时,我只能看到值。
valueStream
.map((key, value) -> new KeyValue(value.getBook_id(), value))
.peek((key, value) -> {
System.out.println("key: " + key); // prints key as expected
System.out.println("value: " + value); // prints value as expected
})
.to("foobartopic",
Produced.with(Serdes.Integer(),bookValueIntSerde));
消费结果"foobartopic"
预期结果相同,但使用了非空白键。
您的生产者使用 Serdes.Integer
进行密钥序列化,因此要通过 kafka-console-consumer
以用户友好的格式打印密钥,您必须将 --key-deserializer
设置为适当的值。在你的情况下是 org.apache.kafka.common.serialization.IntegerDeserializer
.
./kafka/bin//kafka-console-consumer.sh --bootstrap-server :9092 --property print.key=true --from-beginning --topic foobartopic --key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
使用 Java Kafka 流 API。我想 select 为我正在使用的主题创建一个新密钥。新密钥将是不同的类型。当我使用控制台消费者使用新密钥消费主题时,我只能看到值。
valueStream
.map((key, value) -> new KeyValue(value.getBook_id(), value))
.peek((key, value) -> {
System.out.println("key: " + key); // prints key as expected
System.out.println("value: " + value); // prints value as expected
})
.to("foobartopic",
Produced.with(Serdes.Integer(),bookValueIntSerde));
消费结果"foobartopic"
预期结果相同,但使用了非空白键。
您的生产者使用 Serdes.Integer
进行密钥序列化,因此要通过 kafka-console-consumer
以用户友好的格式打印密钥,您必须将 --key-deserializer
设置为适当的值。在你的情况下是 org.apache.kafka.common.serialization.IntegerDeserializer
.
./kafka/bin//kafka-console-consumer.sh --bootstrap-server :9092 --property print.key=true --from-beginning --topic foobartopic --key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer