如何使用 Kafka Streams 将数据写入 Redis 自定义状态存储

How do you write data into a Redis custom state store using Kafka Streams

我最近一直在学习如何使用 Kafka Streams 客户端,我一直在努力解决的一件事是如何使用 Redis 之类的东西从默认状态存储 (RocksDB) 切换到自定义状态存储. Confluent 文档清楚地表明您必须为您的自定义商店实现 StateStore 接口,并且您必须提供 StoreBuilder 的实现以创建该商店的实例。

这是我目前为我的定制商店准备的东西。我还添加了一个简单的写入方法,通过 Redis XADD 命令将新条目附加到指定的流中。

public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
    private  String name;
    private volatile boolean open = false;
    private boolean loggingEnabled = false;

    public MyCustomStore(String name, boolean loggingEnabled) {
        this.name = name;
        this.loggingEnabled = loggingEnabled;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        if (root != null) {
            // register the store
            context.register(root, (key, value) -> {
                write(key.toString(), value.toString());
            });
        }

        this.open = true;
    }

    @Override
    public void flush() {
        // TODO Auto-generated method stub
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
    }

    @Override
    public boolean persistent() {
        // TODO Auto-generated method stub
        return true;
    }

    @Override
    public boolean isOpen() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public void write(String key, String value) {
        try(Jedis jedis = new Jedis("localhost", 6379)) {
            Map<String, String> hash = new HashMap<>();
            hash.put(key, value);
            jedis.xadd("MyStream", StreamEntryID.NEW_ENTRY, hash);
        }
    }
}

public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<String,String>> {

    private boolean cached = true;
    private String name;

    private Map<String,String> logConfig=new HashMap<>();
    private boolean loggingEnabled;

    public MyCustomStoreBuilder(String name, boolean loggingEnabled){
        this.name = name;
        this.loggingEnabled = loggingEnabled;
    }

    @Override
    public StoreBuilder<MyCustomStore<String,String>> withCachingEnabled() {
        this.cached = true;
        return this;
    }

    @Override
    public StoreBuilder<MyCustomStore<String,String>> withCachingDisabled() {
        this.cached = false;
        return null;
    }

    @Override
    public StoreBuilder<MyCustomStore<String,String>> withLoggingEnabled(Map<String, String> config) {
        loggingEnabled=true;
        return  this;
    }

    @Override
    public StoreBuilder<MyCustomStore<String,String>> withLoggingDisabled() {
        this.loggingEnabled = false;
        return this;
    }

    @Override
    public MyCustomStore<String,String> build() {
        return new MyCustomStore<String,String>(this.name, this.loggingEnabled);
    }

    @Override
    public Map<String, String> logConfig() {
        return logConfig;
    }

    @Override
    public boolean loggingEnabled() {
        return loggingEnabled;
    }

    @Override
    public String name() {
        return name;
    }
}

这是我的设置和拓扑结构。

@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
        final Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Long().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Double().getClass());
        props.put(StreamsConfig.STATE_DIR_CONFIG, "data");
        props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, appServerConfig);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
        props.put(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        final String storeName = "the-custome-store";

        Topology topology = new Topology();

        // Create CustomStoreSupplier for store name the-custom-store
        MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder(storeName, false);

        topology.addSource("input","inputTopic");

        topology.addProcessor("redis-processor", () -> new RedisProcessor(storeName), "input");

        topology.addStateStore(customStoreBuilder, "redis-processor");

        KafkaStreams kafkaStreams = new KafkaStreams(topology, props);
        kafkaStreams.start();

        return kafkaStreams;
    }
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<String,String>> {

    @Override
    public boolean accepts(StateStore stateStore) {
        return stateStore instanceof MyCustomStore;
    }

    @Override
    public MyReadableCustomStore<String,String> create(final StateStoreProvider storeProvider, final String storeName) {
        return new MyCustomStoreTypeWrapper<>(storeProvider, storeName, this);
    }
}
public class MyCustomStoreTypeWrapper<K,V> implements MyReadableCustomStore<K,V> {
    private final QueryableStoreType<MyReadableCustomStore<String, String>> customStoreType;
    private final String storeName;
    private final StateStoreProvider provider;

    public MyCustomStoreTypeWrapper(final StateStoreProvider provider,
                                    final String storeName,
                                    final QueryableStoreType<MyReadableCustomStore<String, String>> customStoreType) {

        this.provider = provider;
        this.storeName = storeName;
        this.customStoreType = customStoreType;
    }

    @Override
    public String read(String key) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            StreamEntryID start = new StreamEntryID(0, 0);
            StreamEntryID end = null; // null -> until the last item in the stream
            int count = 2;
            List<StreamEntry> list = jedis.xrange("MyStream", start, end, count);

            if (list != null) {
                // Get the most recently added item, which is also the last item
                StreamEntry streamData = list.get(list.size() - 1);
                return streamData.toString();
            } else {
                System.out.println("No new data in the stream");
            }

            return "";
        }
    }
}
// This throws the InvalidStateStoreException when I try to get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custome-store", new MyCustomStoreType<String,String>());
String value = store.read("testKey");

所以,我的问题是我现在如何真正让状态存储数据持久保存到 Redis 中?我觉得我在状态存储初始化或 StateRestoreCallback 中遗漏了一些东西。如有任何帮助或澄清,我们将不胜感激。

在我看来,您已将商店正确连接到拓扑结构。但是您没有任何处理器使用商店。

它可能看起来像这样:

final String storeName = "the-custome-store";
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder(storeName, false);

Topology topology = new Topology()
topology.addSource("input", "input-topic");

// makes the processor a child of the source node
// the source node forwards its records to the child processor node
topology.addProcessor("redis-processor", () -> new RedisProcessor(storeName), "input");

// add the store and specify the processor(s) that access the store
topology.addStateStore(storeBuilder, "redis-processor");


class RedisProcessor implements Processor<byte[], byte[]> {
        final String storeName;
        MyCustomStore<byte[],byte[]> stateStore;

        public RedisProcessor(String storeName) {
            this.storeName = storeName;
        }

        @Override
        public void init(ProcessorContext context) {
          stateStore = (MyCustomeStore<byte[], byte[]>) context.getStateStore(storeName);
        }

        @Override
        public void process(byte[] key, byte[] value) {
              stateStore.write(key, value);
        }

        @Override
        public void close() {

        }
    }

HTH,告诉我你的结果如何。

更新评论回答:

我认为您需要将 MyCustomStore.isOpen() 更新为 return open 变量。 现在它被硬编码为 return false


Override
  public boolean isOpen() {
   // TODO Auto-generated method stub
   return false;
 }