如何在 KafkaStream 应用程序中获取 partitionId 和 TopicName

How to get partitionId and TopicName in KafkaStream application

我们如何从 KafkaStream 获取主题名称和分区 ID。对于任何其他 Kafka 消费者,我们可以获得主题名称和分区 ID,如下所示:

    ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",record.key(), record.value(), record.partition(), record.offset());}

不确定如何在 KafkaStreams 中获取记录引用。

您可以通过在处理器 API 中公开的 ProcessorContext 获取输入记录的元数据。您可以通过 transform() 和类似方法将处理器 API 嵌入到 DSL 中。

查看文档了解详情:https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context