如何在 Kafka 流中获取当前的 Kafka 主题?
How to get current Kafka topic inside Kafka stream?
我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door、house.room)
并使用 Kafka 流正则表达式主题模式 API 消费所有主题。
一切看起来都很好,我得到了数据的密钥和消息。
为了处理数据,我需要主题名称,这样我就可以根据主题名称进行连接,
但我不知道如何在 Kafka 流 DSL 中获取主题名称。
解决我的问题的一种可能方法是在我的消息中保存主题名称。
不过如果能直接得到主题名就更好了
那么,如何在 Kafka 流中获取当前 Kafka 主题?
Record metadata is accessible through the Processor API. It is also accessible indirectly through the DSL thanks to its Processor API integration.
With the Processor API, you can access record metadata through a ProcessorContext. You can store a reference to the context in an instance field of your processor during Processor#init(), and then query the processor context within Processor#process(), for example (same for Transformer). The context is updated automatically to match the record that is currently being processed, which means that methods such as ProcessorContext#partition() always return the current record’s metadata. Some caveats apply when calling the processor context within scheduled punctuate() function, see the Javadocs for details.
If you use the DSL combined with a custom Transformer, for example, you could transform an input record’s value to also include partition and offset metadata, and subsequent DSL operations such as map or filter could then leverage this information.
为了补充 Matthias J. Sax 点,我附上了示例代码来展示如何完成。
public static void main(final String[] args) {
try {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> textLines = streamsBuilder.stream(inputTopicList);
final KStream<String, String> textLines = builder.stream(inputTopiclist);
textLines.transform(getTopicDetailsTransformer::new)
.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
textLines.to(outputTopic);
} catch (Exception e) {
System.out.println(e);
}
}
private static class getTopicDetailsTransformer implements Transformer<String, String, KeyValue<String, String>> {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
public KeyValue<String, String> transform(final String recordKey, final String recordValue) {
//here i am returning key as topic name.
return KeyValue.pair(context.topic(), recordValue);
}
@Override
public void close() {
// Not needed.
}
}
我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door、house.room) 并使用 Kafka 流正则表达式主题模式 API 消费所有主题。 一切看起来都很好,我得到了数据的密钥和消息。
为了处理数据,我需要主题名称,这样我就可以根据主题名称进行连接, 但我不知道如何在 Kafka 流 DSL 中获取主题名称。
解决我的问题的一种可能方法是在我的消息中保存主题名称。 不过如果能直接得到主题名就更好了
那么,如何在 Kafka 流中获取当前 Kafka 主题?
Record metadata is accessible through the Processor API. It is also accessible indirectly through the DSL thanks to its Processor API integration.
With the Processor API, you can access record metadata through a ProcessorContext. You can store a reference to the context in an instance field of your processor during Processor#init(), and then query the processor context within Processor#process(), for example (same for Transformer). The context is updated automatically to match the record that is currently being processed, which means that methods such as ProcessorContext#partition() always return the current record’s metadata. Some caveats apply when calling the processor context within scheduled punctuate() function, see the Javadocs for details.
If you use the DSL combined with a custom Transformer, for example, you could transform an input record’s value to also include partition and offset metadata, and subsequent DSL operations such as map or filter could then leverage this information.
为了补充 Matthias J. Sax 点,我附上了示例代码来展示如何完成。
public static void main(final String[] args) {
try {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> textLines = streamsBuilder.stream(inputTopicList);
final KStream<String, String> textLines = builder.stream(inputTopiclist);
textLines.transform(getTopicDetailsTransformer::new)
.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
textLines.to(outputTopic);
} catch (Exception e) {
System.out.println(e);
}
}
private static class getTopicDetailsTransformer implements Transformer<String, String, KeyValue<String, String>> {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
public KeyValue<String, String> transform(final String recordKey, final String recordValue) {
//here i am returning key as topic name.
return KeyValue.pair(context.topic(), recordValue);
}
@Override
public void close() {
// Not needed.
}
}