如何使用 spring-kafka 和 kafka-streams 在 KStreams Bean 中记录偏移量
How to log offset in KStreams Bean using spring-kafka and kafka-streams
我已经通过处理器 API 的 transform() 或 process() 方法提到了几乎所有关于 KStreams 上的日志记录偏移的问题,就像这里的许多问题中提到的那样 -
但是我无法得到这些答案的解决方案,所以我问这个问题。
我想在每次消息被流消费时记录分区、消费者组 ID 和偏移量,我不知道如何将 process() 或 transform() 方法与 ProcessorContext 集成 API?如果我在我的 CustomParser class 中实现 Processor 接口,那么我将不得不实现所有方法,但我不确定这是否可行,就像记录元数据的融合文档中提到的那样 - https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api
我已经在 spring-boot 应用程序中设置了 KStreams,如下所示(更改变量名称以供参考)
@Bean
public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final KStream<String, JsonNode> pStream = myBuilder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));
Properties props = streamsConfig.kStreamsConfigs().asProperties();
pstream
.map((key, value) -> {
try {
return CustomParser.parse(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return new KeyValue<>(null, null);
}
)
.filter((key, value) -> {
try {
return MessageFilter.filterNonNull(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return false;
})
.through(
outputTopic,
Produced.with(Serdes.String(), new JsonPOJOSerde<>(TransformedMessage.class)));
return Sets.newHashSet(
new KafkaStreams(profileBuilder.build(), props)
);
}
实施Transformer
;在 init()
中保存 ProcessorContext
;然后,您可以访问 transform()
中的记录元数据,只需 return 原始 key/value.
这是一个example of a Transformer。它由 Spring 提供,供 Apache Kafka 调用 Spring 集成流来转换 key/value。
我已经通过处理器 API 的 transform() 或 process() 方法提到了几乎所有关于 KStreams 上的日志记录偏移的问题,就像这里的许多问题中提到的那样 -
但是我无法得到这些答案的解决方案,所以我问这个问题。
我想在每次消息被流消费时记录分区、消费者组 ID 和偏移量,我不知道如何将 process() 或 transform() 方法与 ProcessorContext 集成 API?如果我在我的 CustomParser class 中实现 Processor 接口,那么我将不得不实现所有方法,但我不确定这是否可行,就像记录元数据的融合文档中提到的那样 - https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api
我已经在 spring-boot 应用程序中设置了 KStreams,如下所示(更改变量名称以供参考)
@Bean
public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final KStream<String, JsonNode> pStream = myBuilder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));
Properties props = streamsConfig.kStreamsConfigs().asProperties();
pstream
.map((key, value) -> {
try {
return CustomParser.parse(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return new KeyValue<>(null, null);
}
)
.filter((key, value) -> {
try {
return MessageFilter.filterNonNull(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return false;
})
.through(
outputTopic,
Produced.with(Serdes.String(), new JsonPOJOSerde<>(TransformedMessage.class)));
return Sets.newHashSet(
new KafkaStreams(profileBuilder.build(), props)
);
}
实施Transformer
;在 init()
中保存 ProcessorContext
;然后,您可以访问 transform()
中的记录元数据,只需 return 原始 key/value.
这是一个example of a Transformer。它由 Spring 提供,供 Apache Kafka 调用 Spring 集成流来转换 key/value。