从卡夫卡流中的调用函数获取状态存储数据

getting statestore data from called function in kafka streams

在 Kafka Streams 的处理器 API 中,我可以将处理器上下文从 init() 如下传递到其他函数,并通过 process() 中的状态存储取回上下文吗?

public void init(ProcessorContext context) {
    this.context = context;

    String resourceName = "config.properties"; 
    ClassLoader loader = Thread.currentThread().getContextClassLoader();
    Properties props = new Properties();
    try(InputStream resourceStream = loader.getResourceAsStream(resourceName)) {
        props.load(resourceStream);
    }
    catch(IOException e){
        e.printStackTrace();
    }

    dataSplitter.timerMessageSource(props, context);//can I pass context like this?

    this.context.schedule(1000);

    // retrieve the key-value store named "patient"
    kvStore = (KeyValueStore<String, PatientDataSummary>) this.context.getStateStore("patient"); 
    //want to get the value of statestore filled by the called function timerMessageSource(), as the data to be put in statestore is getting generated in timerMessageSource()
    //is there any way I can get that by using context or so    
}

ProcessorContext的使用有些限制,您不能在任意时间调用提供的每个方法。因此,这取决于您如何使用它——通常,您可以随意传递它(它在处理器的整个生命周期内始终是同一个对象)。

如果我对您的问题的理解正确,您注册了一个标点符号并在标点符号回调中使用了您的 dataSplitter 并且想要修改商店。这是绝对可能的——您可以将存储放入一个 class 成员,类似于您对上下文所做的那样,或者使用上下文对象在标点回调中获取存储。