Kafka Consumer 获取键值对

Kafka Consumer get key value pair

我目前正在使用 Kafka 和 Flink,我的本地 PC 中有 kafka 运行,我创建了一个正在使用的主题。

Desktop\kafka\bin\windows>kafka-console-consumer.bat --zookeeper localhost:2181 -topic 测试

但它只是在检索消息,

有没有办法获得有关邮件的更多详细信息?让我们说时间?钥匙?我查看了 kafka 文档,但没有找到有关此主题的内容

使用开箱即用的控制台消费者(我使用的是 Kafka 0.9.0.1),您只能使用不同的格式打印消息的键和值。要打印密钥,请设置 属性 print.key=true.

还有一个 属性 key.separator 默认情况下是“\t”(制表符),您也可以更改为您想要的任何内容。

要设置这些属性,您可以创建配置文件并使用 --consumer.config <config file> 或使用 --property key=value 传递属性。

您也可以实现自己的格式化程序并将其与 --formatter 选项一起使用,但您仍然只有键和值,因为这是 MessageFormatter 特性提供的(请参阅下面的 writeTo)。

trait MessageFormatter {
    def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)

    def init(props: Properties) {}

    def close() {}
}

例如:

./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server kafka-1:9092 --topic topic1 --property print.key=true --property key.separator="-" --from-beginning
key-p1
key-p2
key-p3
null-4
null-8
null-0

使用以下命令:

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_name \ 
      --from-beginning --formatter kafka.tools.DefaultMessageFormatter \
      --property print.key=true --property print.value=true \
      --property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
      --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer