如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器?
How to add a custom StateStore to the Kafka Streams DSL processor?
对于我的一个 Kafka 流应用程序,我需要同时使用 DSL 和处理器的功能 API。我的流媒体应用程序流程是
source -> selectKey -> filter -> aggregate (on a window) -> sink
聚合后,我需要向接收器发送一条聚合消息。所以我定义我的拓扑如下
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor());
我定义了一个自定义 StateStore
并将其注册到我的处理器,如下所示
public class MyProcessor implements Processor<String, String> {
private ProcessorContext context = null;
Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);
KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build()
.get();
public MyProcessor() {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.register(invStore, false, null); // register the store
this.context.schedule(10 * 60 * 1000L);
}
@Override
public void process(String partitionKey, String message) {
try {
MessageModel smb = new MessageModel(message);
HashMapStore oldStore = invStore.get(partitionKey);
if (oldStore == null) {
oldStore = new HashMapStore();
}
oldStore.addSmb(smb);
invStore.put(partitionKey, oldStore);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void punctuate(long timestamp) {
// processes all the messages in the state store and sends single aggregate message
}
@Override
public void close() {
invStore.close();
}
}
当我 运行 应用程序时,我得到 java.lang.NullPointerException
Exception in thread "StreamThread-18" java.lang.NullPointerException
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
知道这里出了什么问题吗?
您需要使用 StreamsBuilder
(或旧版本中的 KStreamBuilder
)在处理器 外部 注册存储。首先创建商店,然后将其注册到 StreamsBuilder
(KStreamBuilder
),然后在添加处理器时提供商店名称以连接处理器和商店。
StreamsBuilder builder = new StreamsBuilder();
// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("invStore"),
Serdes.String(),
invSerde));
// register store
builder.addStateStore(storeBuilder);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
// older API:
KStreamBuilder builder = new KStreamBuilder();
// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build();
// register store
builder.addStateStore(storeSupplier);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
对于我的一个 Kafka 流应用程序,我需要同时使用 DSL 和处理器的功能 API。我的流媒体应用程序流程是
source -> selectKey -> filter -> aggregate (on a window) -> sink
聚合后,我需要向接收器发送一条聚合消息。所以我定义我的拓扑如下
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor());
我定义了一个自定义 StateStore
并将其注册到我的处理器,如下所示
public class MyProcessor implements Processor<String, String> {
private ProcessorContext context = null;
Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);
KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build()
.get();
public MyProcessor() {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.register(invStore, false, null); // register the store
this.context.schedule(10 * 60 * 1000L);
}
@Override
public void process(String partitionKey, String message) {
try {
MessageModel smb = new MessageModel(message);
HashMapStore oldStore = invStore.get(partitionKey);
if (oldStore == null) {
oldStore = new HashMapStore();
}
oldStore.addSmb(smb);
invStore.put(partitionKey, oldStore);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void punctuate(long timestamp) {
// processes all the messages in the state store and sends single aggregate message
}
@Override
public void close() {
invStore.close();
}
}
当我 运行 应用程序时,我得到 java.lang.NullPointerException
Exception in thread "StreamThread-18" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
知道这里出了什么问题吗?
您需要使用 StreamsBuilder
(或旧版本中的 KStreamBuilder
)在处理器 外部 注册存储。首先创建商店,然后将其注册到 StreamsBuilder
(KStreamBuilder
),然后在添加处理器时提供商店名称以连接处理器和商店。
StreamsBuilder builder = new StreamsBuilder();
// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("invStore"),
Serdes.String(),
invSerde));
// register store
builder.addStateStore(storeBuilder);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
// older API:
KStreamBuilder builder = new KStreamBuilder();
// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build();
// register store
builder.addStateStore(storeSupplier);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name