spring-cloud-stream-binder-kstream 什么版本兼容Kafka 1.0.0

What version of spring-cloud-stream-binder-kstream is compatible with Kafka 1.0.0

当尝试 运行 略微改编的字数统计 example 版本时,出现 "No qualifying bean of type'org.apache.kafka.streams.kstream.KStreamBuilder'" 错误。在我的 POM 中,我使用 spring-cloud-stream-dependencies:Elmhurst.M3 来导入依赖项,它导入了 spring-cloud -stream-binder-kstream:2.0.0.M3.

但是,我认为我没有做任何特别的事情:

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class KafkaExampleSpringcloud1Application {

       public static void main(String[] args) {
        SpringApplication.run(KafkaExampleSpringcloud1Application.class, 
        args);

    }

    @Autowired
    private TimeWindows timeWindows;

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<Object, String> input) {

        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serdes.String(), Serdes.String())
                .count(timeWindows, "WordCounts")
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }



    static class WordCount {

        private String word;

        private long count;

        private Date start;

        private Date end;

        WordCount(String word, long count, Date start, Date end) {
            this.word = word;
            this.count = count;
            this.start = start;
            this.end = end;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public long getCount() {
            return count;
        }

        public void setCount(long count) {
            this.count = count;
        }

        public Date getStart() {
            return start;
        }

        public void setStart(Date start) {
            this.start = start;
        }

        public Date getEnd() {
            return end;
        }

        public void setEnd(Date end) {
            this.end = end;
        }
    }

这是我的 application.yml:

application.name: kafka-example-01
spring.cloud.stream:
  kstream:
    configuration:
      commit.interval.ms: 1000
      key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    timeWindow.length: 5000
    binder:
      brokers: localhost
      zkNodes: localhost
  kafka:
    binder:
      autoCreateTopics: false
  bindings:
    output.destination: word-count-output
    input.destnation : word-count-input

我尝试使用旧版本,但我一直遇到 ClassNotFoundException 错误(例如 StreamsBuilder class)

目前还没有里程碑,只是 2.0。0.BUILD-SNAPSHOT。它将在 M4.

Commit here.

得到它与这些一起工作:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kstream</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>1.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.1.1.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>