spring-cloud-stream-binder-kstream 什么版本兼容Kafka 1.0.0
What version of spring-cloud-stream-binder-kstream is compatible with Kafka 1.0.0
当尝试 运行 略微改编的字数统计 example 版本时,出现 "No qualifying bean of type'org.apache.kafka.streams.kstream.KStreamBuilder'" 错误。在我的 POM 中,我使用 spring-cloud-stream-dependencies:Elmhurst.M3 来导入依赖项,它导入了 spring-cloud -stream-binder-kstream:2.0.0.M3.
但是,我认为我没有做任何特别的事情:
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class KafkaExampleSpringcloud1Application {
public static void main(String[] args) {
SpringApplication.run(KafkaExampleSpringcloud1Application.class,
args);
}
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "WordCounts")
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
static class WordCount {
private String word;
private long count;
private Date start;
private Date end;
WordCount(String word, long count, Date start, Date end) {
this.word = word;
this.count = count;
this.start = start;
this.end = end;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public Date getStart() {
return start;
}
public void setStart(Date start) {
this.start = start;
}
public Date getEnd() {
return end;
}
public void setEnd(Date end) {
this.end = end;
}
}
这是我的 application.yml:
application.name: kafka-example-01
spring.cloud.stream:
kstream:
configuration:
commit.interval.ms: 1000
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timeWindow.length: 5000
binder:
brokers: localhost
zkNodes: localhost
kafka:
binder:
autoCreateTopics: false
bindings:
output.destination: word-count-output
input.destnation : word-count-input
我尝试使用旧版本,但我一直遇到 ClassNotFoundException 错误(例如 StreamsBuilder class)
目前还没有里程碑,只是 2.0。0.BUILD-SNAPSHOT。它将在 M4.
得到它与这些一起工作:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
当尝试 运行 略微改编的字数统计 example 版本时,出现 "No qualifying bean of type'org.apache.kafka.streams.kstream.KStreamBuilder'" 错误。在我的 POM 中,我使用 spring-cloud-stream-dependencies:Elmhurst.M3 来导入依赖项,它导入了 spring-cloud -stream-binder-kstream:2.0.0.M3.
但是,我认为我没有做任何特别的事情:
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class KafkaExampleSpringcloud1Application {
public static void main(String[] args) {
SpringApplication.run(KafkaExampleSpringcloud1Application.class,
args);
}
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "WordCounts")
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
static class WordCount {
private String word;
private long count;
private Date start;
private Date end;
WordCount(String word, long count, Date start, Date end) {
this.word = word;
this.count = count;
this.start = start;
this.end = end;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public Date getStart() {
return start;
}
public void setStart(Date start) {
this.start = start;
}
public Date getEnd() {
return end;
}
public void setEnd(Date end) {
this.end = end;
}
}
这是我的 application.yml:
application.name: kafka-example-01
spring.cloud.stream:
kstream:
configuration:
commit.interval.ms: 1000
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timeWindow.length: 5000
binder:
brokers: localhost
zkNodes: localhost
kafka:
binder:
autoCreateTopics: false
bindings:
output.destination: word-count-output
input.destnation : word-count-input
我尝试使用旧版本,但我一直遇到 ClassNotFoundException 错误(例如 StreamsBuilder class)
目前还没有里程碑,只是 2.0。0.BUILD-SNAPSHOT。它将在 M4.
得到它与这些一起工作:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>