如何使用 Kafka Streams 将数据写入 Redis 自定义状态存储
How do you write data into a Redis custom state store using Kafka Streams
我最近一直在学习如何使用 Kafka Streams 客户端,我一直在努力解决的一件事是如何使用 Redis 之类的东西从默认状态存储 (RocksDB) 切换到自定义状态存储. Confluent 文档清楚地表明您必须为您的自定义商店实现 StateStore 接口,并且您必须提供 StoreBuilder 的实现以创建该商店的实例。
这是我目前为我的定制商店准备的东西。我还添加了一个简单的写入方法,通过 Redis XADD 命令将新条目附加到指定的流中。
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
private String name;
private volatile boolean open = false;
private boolean loggingEnabled = false;
public MyCustomStore(String name, boolean loggingEnabled) {
this.name = name;
this.loggingEnabled = loggingEnabled;
}
@Override
public String name() {
return this.name;
}
@Override
public void init(ProcessorContext context, StateStore root) {
if (root != null) {
// register the store
context.register(root, (key, value) -> {
write(key.toString(), value.toString());
});
}
this.open = true;
}
@Override
public void flush() {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public boolean persistent() {
// TODO Auto-generated method stub
return true;
}
@Override
public boolean isOpen() {
// TODO Auto-generated method stub
return false;
}
@Override
public void write(String key, String value) {
try(Jedis jedis = new Jedis("localhost", 6379)) {
Map<String, String> hash = new HashMap<>();
hash.put(key, value);
jedis.xadd("MyStream", StreamEntryID.NEW_ENTRY, hash);
}
}
}
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<String,String>> {
private boolean cached = true;
private String name;
private Map<String,String> logConfig=new HashMap<>();
private boolean loggingEnabled;
public MyCustomStoreBuilder(String name, boolean loggingEnabled){
this.name = name;
this.loggingEnabled = loggingEnabled;
}
@Override
public StoreBuilder<MyCustomStore<String,String>> withCachingEnabled() {
this.cached = true;
return this;
}
@Override
public StoreBuilder<MyCustomStore<String,String>> withCachingDisabled() {
this.cached = false;
return null;
}
@Override
public StoreBuilder<MyCustomStore<String,String>> withLoggingEnabled(Map<String, String> config) {
loggingEnabled=true;
return this;
}
@Override
public StoreBuilder<MyCustomStore<String,String>> withLoggingDisabled() {
this.loggingEnabled = false;
return this;
}
@Override
public MyCustomStore<String,String> build() {
return new MyCustomStore<String,String>(this.name, this.loggingEnabled);
}
@Override
public Map<String, String> logConfig() {
return logConfig;
}
@Override
public boolean loggingEnabled() {
return loggingEnabled;
}
@Override
public String name() {
return name;
}
}
这是我的设置和拓扑结构。
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Long().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Double().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "data");
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, appServerConfig);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
props.put(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final String storeName = "the-custome-store";
Topology topology = new Topology();
// Create CustomStoreSupplier for store name the-custom-store
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder(storeName, false);
topology.addSource("input","inputTopic");
topology.addProcessor("redis-processor", () -> new RedisProcessor(storeName), "input");
topology.addStateStore(customStoreBuilder, "redis-processor");
KafkaStreams kafkaStreams = new KafkaStreams(topology, props);
kafkaStreams.start();
return kafkaStreams;
}
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<String,String>> {
@Override
public boolean accepts(StateStore stateStore) {
return stateStore instanceof MyCustomStore;
}
@Override
public MyReadableCustomStore<String,String> create(final StateStoreProvider storeProvider, final String storeName) {
return new MyCustomStoreTypeWrapper<>(storeProvider, storeName, this);
}
}
public class MyCustomStoreTypeWrapper<K,V> implements MyReadableCustomStore<K,V> {
private final QueryableStoreType<MyReadableCustomStore<String, String>> customStoreType;
private final String storeName;
private final StateStoreProvider provider;
public MyCustomStoreTypeWrapper(final StateStoreProvider provider,
final String storeName,
final QueryableStoreType<MyReadableCustomStore<String, String>> customStoreType) {
this.provider = provider;
this.storeName = storeName;
this.customStoreType = customStoreType;
}
@Override
public String read(String key) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
StreamEntryID start = new StreamEntryID(0, 0);
StreamEntryID end = null; // null -> until the last item in the stream
int count = 2;
List<StreamEntry> list = jedis.xrange("MyStream", start, end, count);
if (list != null) {
// Get the most recently added item, which is also the last item
StreamEntry streamData = list.get(list.size() - 1);
return streamData.toString();
} else {
System.out.println("No new data in the stream");
}
return "";
}
}
}
// This throws the InvalidStateStoreException when I try to get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custome-store", new MyCustomStoreType<String,String>());
String value = store.read("testKey");
所以,我的问题是我现在如何真正让状态存储数据持久保存到 Redis 中?我觉得我在状态存储初始化或 StateRestoreCallback 中遗漏了一些东西。如有任何帮助或澄清,我们将不胜感激。
在我看来,您已将商店正确连接到拓扑结构。但是您没有任何处理器使用商店。
它可能看起来像这样:
final String storeName = "the-custome-store";
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder(storeName, false);
Topology topology = new Topology()
topology.addSource("input", "input-topic");
// makes the processor a child of the source node
// the source node forwards its records to the child processor node
topology.addProcessor("redis-processor", () -> new RedisProcessor(storeName), "input");
// add the store and specify the processor(s) that access the store
topology.addStateStore(storeBuilder, "redis-processor");
class RedisProcessor implements Processor<byte[], byte[]> {
final String storeName;
MyCustomStore<byte[],byte[]> stateStore;
public RedisProcessor(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
stateStore = (MyCustomeStore<byte[], byte[]>) context.getStateStore(storeName);
}
@Override
public void process(byte[] key, byte[] value) {
stateStore.write(key, value);
}
@Override
public void close() {
}
}
HTH,告诉我你的结果如何。
更新评论回答:
我认为您需要将 MyCustomStore.isOpen()
更新为 return open
变量。
现在它被硬编码为 return false
Override
public boolean isOpen() {
// TODO Auto-generated method stub
return false;
}
我最近一直在学习如何使用 Kafka Streams 客户端,我一直在努力解决的一件事是如何使用 Redis 之类的东西从默认状态存储 (RocksDB) 切换到自定义状态存储. Confluent 文档清楚地表明您必须为您的自定义商店实现 StateStore 接口,并且您必须提供 StoreBuilder 的实现以创建该商店的实例。
这是我目前为我的定制商店准备的东西。我还添加了一个简单的写入方法,通过 Redis XADD 命令将新条目附加到指定的流中。
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
private String name;
private volatile boolean open = false;
private boolean loggingEnabled = false;
public MyCustomStore(String name, boolean loggingEnabled) {
this.name = name;
this.loggingEnabled = loggingEnabled;
}
@Override
public String name() {
return this.name;
}
@Override
public void init(ProcessorContext context, StateStore root) {
if (root != null) {
// register the store
context.register(root, (key, value) -> {
write(key.toString(), value.toString());
});
}
this.open = true;
}
@Override
public void flush() {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public boolean persistent() {
// TODO Auto-generated method stub
return true;
}
@Override
public boolean isOpen() {
// TODO Auto-generated method stub
return false;
}
@Override
public void write(String key, String value) {
try(Jedis jedis = new Jedis("localhost", 6379)) {
Map<String, String> hash = new HashMap<>();
hash.put(key, value);
jedis.xadd("MyStream", StreamEntryID.NEW_ENTRY, hash);
}
}
}
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<String,String>> {
private boolean cached = true;
private String name;
private Map<String,String> logConfig=new HashMap<>();
private boolean loggingEnabled;
public MyCustomStoreBuilder(String name, boolean loggingEnabled){
this.name = name;
this.loggingEnabled = loggingEnabled;
}
@Override
public StoreBuilder<MyCustomStore<String,String>> withCachingEnabled() {
this.cached = true;
return this;
}
@Override
public StoreBuilder<MyCustomStore<String,String>> withCachingDisabled() {
this.cached = false;
return null;
}
@Override
public StoreBuilder<MyCustomStore<String,String>> withLoggingEnabled(Map<String, String> config) {
loggingEnabled=true;
return this;
}
@Override
public StoreBuilder<MyCustomStore<String,String>> withLoggingDisabled() {
this.loggingEnabled = false;
return this;
}
@Override
public MyCustomStore<String,String> build() {
return new MyCustomStore<String,String>(this.name, this.loggingEnabled);
}
@Override
public Map<String, String> logConfig() {
return logConfig;
}
@Override
public boolean loggingEnabled() {
return loggingEnabled;
}
@Override
public String name() {
return name;
}
}
这是我的设置和拓扑结构。
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Long().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Double().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "data");
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, appServerConfig);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
props.put(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final String storeName = "the-custome-store";
Topology topology = new Topology();
// Create CustomStoreSupplier for store name the-custom-store
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder(storeName, false);
topology.addSource("input","inputTopic");
topology.addProcessor("redis-processor", () -> new RedisProcessor(storeName), "input");
topology.addStateStore(customStoreBuilder, "redis-processor");
KafkaStreams kafkaStreams = new KafkaStreams(topology, props);
kafkaStreams.start();
return kafkaStreams;
}
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<String,String>> {
@Override
public boolean accepts(StateStore stateStore) {
return stateStore instanceof MyCustomStore;
}
@Override
public MyReadableCustomStore<String,String> create(final StateStoreProvider storeProvider, final String storeName) {
return new MyCustomStoreTypeWrapper<>(storeProvider, storeName, this);
}
}
public class MyCustomStoreTypeWrapper<K,V> implements MyReadableCustomStore<K,V> {
private final QueryableStoreType<MyReadableCustomStore<String, String>> customStoreType;
private final String storeName;
private final StateStoreProvider provider;
public MyCustomStoreTypeWrapper(final StateStoreProvider provider,
final String storeName,
final QueryableStoreType<MyReadableCustomStore<String, String>> customStoreType) {
this.provider = provider;
this.storeName = storeName;
this.customStoreType = customStoreType;
}
@Override
public String read(String key) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
StreamEntryID start = new StreamEntryID(0, 0);
StreamEntryID end = null; // null -> until the last item in the stream
int count = 2;
List<StreamEntry> list = jedis.xrange("MyStream", start, end, count);
if (list != null) {
// Get the most recently added item, which is also the last item
StreamEntry streamData = list.get(list.size() - 1);
return streamData.toString();
} else {
System.out.println("No new data in the stream");
}
return "";
}
}
}
// This throws the InvalidStateStoreException when I try to get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custome-store", new MyCustomStoreType<String,String>());
String value = store.read("testKey");
所以,我的问题是我现在如何真正让状态存储数据持久保存到 Redis 中?我觉得我在状态存储初始化或 StateRestoreCallback 中遗漏了一些东西。如有任何帮助或澄清,我们将不胜感激。
在我看来,您已将商店正确连接到拓扑结构。但是您没有任何处理器使用商店。
它可能看起来像这样:
final String storeName = "the-custome-store";
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder(storeName, false);
Topology topology = new Topology()
topology.addSource("input", "input-topic");
// makes the processor a child of the source node
// the source node forwards its records to the child processor node
topology.addProcessor("redis-processor", () -> new RedisProcessor(storeName), "input");
// add the store and specify the processor(s) that access the store
topology.addStateStore(storeBuilder, "redis-processor");
class RedisProcessor implements Processor<byte[], byte[]> {
final String storeName;
MyCustomStore<byte[],byte[]> stateStore;
public RedisProcessor(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
stateStore = (MyCustomeStore<byte[], byte[]>) context.getStateStore(storeName);
}
@Override
public void process(byte[] key, byte[] value) {
stateStore.write(key, value);
}
@Override
public void close() {
}
}
HTH,告诉我你的结果如何。
更新评论回答:
我认为您需要将 MyCustomStore.isOpen()
更新为 return open
变量。
现在它被硬编码为 return false
Override
public boolean isOpen() {
// TODO Auto-generated method stub
return false;
}