新建 KTable returns 无
Newly build KTable returns nothing
我正在尝试使用 KTable 来消费来自 Kafka 主题的事件。但是,它 returns 什么都没有。当我使用 KStream 时,它 returns 并打印对象。这真是奇怪。 Producer and Consumer can be found here
//Not working
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Consumed.with(Serdes.String(), customerSerde),Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name()));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));
//KStream working
KStream<String, Customer> customerKStream= streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde));
customerKStream.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)))
经过大量研究,我发现我的语法存在问题。根据 Confluent/Kafka 文档,我使用的语法是有效的,但它不起作用。将向 Kafka 团队提出错误。现在,有效的新语法是
KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
.withKeySerde(Serdes.String())
.withValueSerde(customerSerde));
我应该包括 withKeySerde()
和 withValueSerde()
以使 KTable 工作。但是这个没有提到Confluent/Kafka文档
我正在尝试使用 KTable 来消费来自 Kafka 主题的事件。但是,它 returns 什么都没有。当我使用 KStream 时,它 returns 并打印对象。这真是奇怪。 Producer and Consumer can be found here
//Not working
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Consumed.with(Serdes.String(), customerSerde),Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name()));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));
//KStream working
KStream<String, Customer> customerKStream= streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde));
customerKStream.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)))
经过大量研究,我发现我的语法存在问题。根据 Confluent/Kafka 文档,我使用的语法是有效的,但它不起作用。将向 Kafka 团队提出错误。现在,有效的新语法是
KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
.withKeySerde(Serdes.String())
.withValueSerde(customerSerde));
我应该包括 withKeySerde()
和 withValueSerde()
以使 KTable 工作。但是这个没有提到Confluent/Kafka文档