Kafka Streams:实现一个简单的 KeyValueStore,我可以在其中放置和获取数据

Kafka Streams: Implement a simple KeyValueStore where I can put and get data

我有一个 Kafka 流应用程序,它对传入状态进行操作,需要在写入下一个主题之前存储状态。只有在本地存储中的状态更新后,写入才会发生。

像这样。

stream.map(this::getAndUpdateState)
          .map(this::processStateAndEvent)
          .to("topicname");

所以在getAndUpdateState()我可以做

state = store.get(key); // or new if null
state = updateState(state, event);  // update changes to state
store.put(key, state);  // write back the state
return state;

如何在 kafka 存储上实现简单的 get() 和 put() 操作?我已经尝试使用 KeyValueStore 但它有问题,因为我必须向它添加一个源和接收器处理器和所有。

或者,使用 KTable 或其他一些概念获取和放入 kafka 的方法也很好。

听起来你想做批处理。 Kafka Streams 是一个流处理库,所有处理器 运行 在 parallel/concurrently 构建数据管道。

我想您仍然可以使用带有附加状态的 transform() 并且不向下游发出任何内容,而只是将数据放入存储中。然后,您可以安排一个挂钟时间标点来扫描整个商店并向下游发送商店中的所有数据。但是,总的来说这似乎是一种反模式。

想出思想的难点是,状态 "fully loaded" 是什么时候——因为一个主题是 definition/conceptually 无限的,加载状态将 "never" 完成。

感谢 and 的建议。

我能够使用 transform() 方法在 kafka 流中进行有状态处理。下面给出了基于原始 Pipe 示例的完整工作代码。

Pipe.java :

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe{
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();

        //  setting Configs
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // initializing  a streambuilder for building topology.
        final StreamsBuilder builder = new StreamsBuilder();
        // creating a KStream that is continuously generating records from its source kafka topic "streams-plaintext-output"
        KStream<String, String> source = builder.stream("streams-plaintext-input");

        StoreBuilder<KeyValueStore<String, Long>> wordCountsStore = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("WordCountsStore"),
                Serdes.String(),
                Serdes.Long())
                .withCachingEnabled();

        builder.addStateStore(wordCountsStore);

        source.map((k, v) -> KeyValue.pair("key", v))
                .peek((k, s) -> System.out.printf("After keying: %s, value: %s\n", k, s))
                .transform(new SampleTransformSupplier(wordCountsStore.name()), wordCountsStore.name())
                .peek((k, s) -> System.out.printf("After transform: %s, value: %s\n", k, s));
        // writing this source to another kafka topic "streams-pipe-output"
        source.to("streams-pipe-output");
        // generating the topology
        final Topology topology = builder.build();
        System.out.print(topology.describe());

        // constructing a streams client with the properties and topology
        final KafkaStreams streams = new KafkaStreams(topology, properties);
        final CountDownLatch latch = new CountDownLatch(1);

        // attaching shutdown handler
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run(){
                streams.close();
                latch.countDown();
            }
        });
        try{
            streams.start();
            latch.await();
        } catch (Throwable e){
            System.exit(1);
        }
        System.exit(0);
    }

    private static class SampleTransformSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {

        final private String stateStoreName;

        public SampleTransformSupplier(String stateStoreName) {
            this.stateStoreName = stateStoreName;
        }

        @Override
        public Transformer<String, String, KeyValue<String, String>> get() {
            return new Transformer<String, String, KeyValue<String, String>>() {


                private KeyValueStore<String, Long> stateStore;

                @SuppressWarnings("unchecked")
                @Override
                public void init(ProcessorContext processorContext) {
                    stateStore = (KeyValueStore<String, Long>) processorContext.getStateStore(stateStoreName);
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    Long countSoFar = stateStore.get(key);
                    if(countSoFar == null){
                        System.out.print("Initializing count so far. this message should be printed only once");
                        countSoFar = 0L;
                    }
                    countSoFar += value.length();
                    System.out.printf(" Key: %s, Value: %s, Count: %d\n\n", key, value, countSoFar);
                    stateStore.put(key, countSoFar);
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                    // No need to close as this is handled by kafka.
                }
            };
        }
    }
}