使用 Spring 云流无法在 Kafka Ktable 中看到任何消息
Not able to see any messages in the Kafka Ktable with Spring Cloud streamng
我使用 Spring 云流 API 编写了一个 Kafka 流应用程序,但无法在 KTable 中看到任何消息。我无法追踪问题。任何指点或帮助表示赞赏。
下面是代码
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class KafkaStreamsSample {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsSample.class, args);
}
//bin/confluent local produce user -- --property parse.key=true --property key.separator=~
//2~{"id": "2", "name": "john", "age": 43}
//1~{"id": "1", "name": "bob", "age": 44}
//3~{"id": "3", "name": "peter", "age": 45}
//4~{"id": "4", "name": "mark", "age": 46}
//2~{"id": "2", "name": "john", "age": 99}
//3~{"id": "3", "name": "paul", "age": 98}
public static class KStreamToTableJoinApplication {
@Bean
public Function<KStream<String, User>, KTable<String, User>> process() {
return input -> input
.groupByKey()
.reduce((aggValue, newValue) -> newValue, Materialized.as ("allusers"));
}
}
}
application.yml
spring.application.name: stream-global-sample
spring.cloud.stream.bindings.process-in-0:
destination: user
spring.cloud.stream.bindings.process-out-0:
destination: usertable
spring.cloud.stream.kafka.streams.bindings.process-out-0:
producer:
materializedAs: allusers
使用 Spring Cloud Stream Kafka Streams 活页夹,您不能将出站设置为 KTable
。它必须是 KStream
。将您的签名更改为:public Function<KStream<String, User>, KStream<String, User>>
。然后,在 reduce
通话中拨打 toStream()
。这会给你一个 KStream
到 return。这应该允许您在出站主题中看到输出。 reduce
操作的结果通过 KTable
实现到状态存储中。因此,如果您愿意,可以通过交互式查询直接查询该状态存储。
我使用 Spring 云流 API 编写了一个 Kafka 流应用程序,但无法在 KTable 中看到任何消息。我无法追踪问题。任何指点或帮助表示赞赏。
下面是代码
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class KafkaStreamsSample {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsSample.class, args);
}
//bin/confluent local produce user -- --property parse.key=true --property key.separator=~
//2~{"id": "2", "name": "john", "age": 43}
//1~{"id": "1", "name": "bob", "age": 44}
//3~{"id": "3", "name": "peter", "age": 45}
//4~{"id": "4", "name": "mark", "age": 46}
//2~{"id": "2", "name": "john", "age": 99}
//3~{"id": "3", "name": "paul", "age": 98}
public static class KStreamToTableJoinApplication {
@Bean
public Function<KStream<String, User>, KTable<String, User>> process() {
return input -> input
.groupByKey()
.reduce((aggValue, newValue) -> newValue, Materialized.as ("allusers"));
}
}
}
application.yml
spring.application.name: stream-global-sample
spring.cloud.stream.bindings.process-in-0:
destination: user
spring.cloud.stream.bindings.process-out-0:
destination: usertable
spring.cloud.stream.kafka.streams.bindings.process-out-0:
producer:
materializedAs: allusers
使用 Spring Cloud Stream Kafka Streams 活页夹,您不能将出站设置为 KTable
。它必须是 KStream
。将您的签名更改为:public Function<KStream<String, User>, KStream<String, User>>
。然后,在 reduce
通话中拨打 toStream()
。这会给你一个 KStream
到 return。这应该允许您在出站主题中看到输出。 reduce
操作的结果通过 KTable
实现到状态存储中。因此,如果您愿意,可以通过交互式查询直接查询该状态存储。