从卡夫卡流中的调用函数获取状态存储数据
getting statestore data from called function in kafka streams
在 Kafka Streams 的处理器 API 中,我可以将处理器上下文从 init()
如下传递到其他函数,并通过 process()
中的状态存储取回上下文吗?
public void init(ProcessorContext context) {
this.context = context;
String resourceName = "config.properties";
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Properties props = new Properties();
try(InputStream resourceStream = loader.getResourceAsStream(resourceName)) {
props.load(resourceStream);
}
catch(IOException e){
e.printStackTrace();
}
dataSplitter.timerMessageSource(props, context);//can I pass context like this?
this.context.schedule(1000);
// retrieve the key-value store named "patient"
kvStore = (KeyValueStore<String, PatientDataSummary>) this.context.getStateStore("patient");
//want to get the value of statestore filled by the called function timerMessageSource(), as the data to be put in statestore is getting generated in timerMessageSource()
//is there any way I can get that by using context or so
}
ProcessorContext
的使用有些限制,您不能在任意时间调用提供的每个方法。因此,这取决于您如何使用它——通常,您可以随意传递它(它在处理器的整个生命周期内始终是同一个对象)。
如果我对您的问题的理解正确,您注册了一个标点符号并在标点符号回调中使用了您的 dataSplitter
并且想要修改商店。这是绝对可能的——您可以将存储放入一个 class 成员,类似于您对上下文所做的那样,或者使用上下文对象在标点回调中获取存储。
在 Kafka Streams 的处理器 API 中,我可以将处理器上下文从 init()
如下传递到其他函数,并通过 process()
中的状态存储取回上下文吗?
public void init(ProcessorContext context) {
this.context = context;
String resourceName = "config.properties";
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Properties props = new Properties();
try(InputStream resourceStream = loader.getResourceAsStream(resourceName)) {
props.load(resourceStream);
}
catch(IOException e){
e.printStackTrace();
}
dataSplitter.timerMessageSource(props, context);//can I pass context like this?
this.context.schedule(1000);
// retrieve the key-value store named "patient"
kvStore = (KeyValueStore<String, PatientDataSummary>) this.context.getStateStore("patient");
//want to get the value of statestore filled by the called function timerMessageSource(), as the data to be put in statestore is getting generated in timerMessageSource()
//is there any way I can get that by using context or so
}
ProcessorContext
的使用有些限制,您不能在任意时间调用提供的每个方法。因此,这取决于您如何使用它——通常,您可以随意传递它(它在处理器的整个生命周期内始终是同一个对象)。
如果我对您的问题的理解正确,您注册了一个标点符号并在标点符号回调中使用了您的 dataSplitter
并且想要修改商店。这是绝对可能的——您可以将存储放入一个 class 成员,类似于您对上下文所做的那样,或者使用上下文对象在标点回调中获取存储。