如何使用 spring-kafka 和 kafka-streams 在 KStreams Bean 中记录偏移量

How to log offset in KStreams Bean using spring-kafka and kafka-streams

我已经通过处理器 API 的 transform() 或 process() 方法提到了几乎所有关于 KStreams 上的日志记录偏移的问题,就像这里的许多问题中提到的那样 -

但是我无法得到这些答案的解决方案,所以我问这个问题。

我想在每次消息被流消费时记录分区、消费者组 ID 和偏移量,我不知道如何将 process() 或 transform() 方法与 ProcessorContext 集成 API?如果我在我的 CustomParser class 中实现 Processor 接口,那么我将不得不实现所有方法,但我不确定这是否可行,就像记录元数据的融合文档中提到的那样 - https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api

我已经在 spring-boot 应用程序中设置了 KStreams,如下所示(更改变量名称以供参考)

 @Bean
    public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
        Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        final KStream<String, JsonNode> pStream = myBuilder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));

        Properties props = streamsConfig.kStreamsConfigs().asProperties();
       

        pstream
                .map((key, value) -> {
                            try {
                                return CustomParser.parse(key, value);
                            } catch (Exception e) {
                                LOGGER.error("Error occurred - " + e.getMessage());
                            }
                            return new KeyValue<>(null, null);
                        }
                )
                .filter((key, value) -> {
                    try {
                        return MessageFilter.filterNonNull(key, value);
                    } catch (Exception e) {
                        LOGGER.error("Error occurred - " + e.getMessage());
                    }
                    return false;
                })
                .through(
                        outputTopic,
                        Produced.with(Serdes.String(), new JsonPOJOSerde<>(TransformedMessage.class)));

        return Sets.newHashSet(
                new KafkaStreams(profileBuilder.build(), props)
        );
    }

实施Transformer;在 init() 中保存 ProcessorContext;然后,您可以访问 transform() 中的记录元数据,只需 return 原始 key/value.

这是一个example of a Transformer。它由 Spring 提供,供 Apache Kafka 调用 Spring 集成流来转换 key/value。