使用 Kafka Binder 在 Spring 云中打印 JsonObject

Printing JsonObject in Spring cloud using Kafka Binder

我是 Spring Cloud 和 kafka 流的新手。我正在尝试使用 kafka 活页夹设置 spring 云应用程序。我尝试在本地测试 kafka 流处理器,但我无法打印任何日志。

我的 kafka 消息将包含 JSONObject。 kafkaStreamListener class 是:

@Configuration
public class KafkaStreamListener {

    private static Logger logger = LogManager.getLogger(KafkaStreamListener.class);
    
    //bean for processing autonomous messages 
     @Bean
      public Function<KStream<String, JSONObject>, KStream<String, JSONObject>> autonomousProcessor() {
         System.out.println("start of stream processor%%%%%%%%%%%%%%%%%%%%%**************************");
         logger.info("inside processor");
         return kstream -> kstream.filter((key,value) -> {
         System.out.println(value.toString()); 
         return true;});
             }

Application.properties :

#Processor group with inputs and outputs
spring.cloud.stream.function.definition = autonomousProcessor
spring.cloud.stream.bindings.autonomousProcessor-in-0.destination = INPUT_TOPIC
spring.cloud.stream.bindings.autonomousProcessor-out-0.destination = OUTPUT_TOPIC
spring.cloud.stream.kafka.streams.binder.functions.autonomousProcessor.application-id= autonomousProcessorGroup

问题: 在 Debug 模式下,断点直接到达过滤步骤而不是没有任何动作。它会跳过记录器和 SOP。不确定可能是什么问题。 Spring云版:Hoxton.SR11

我认为您所看到的是正确的行为。您的函数将仅在 bootstrap 时间被活页夹调用一次,然后将调用初始 SOP 和记录器(再次仅调用一次)。如果您在启动应用程序时在第一个 SOP 或记录器上设置断点,您将看到它们被调用。然后当 Kafka 主题接收到数据时,将调用提供的 lambda(带过滤器)。过滤器中的内部 SOP 应在每次调用文件管理器时记录 value.toString()