Kafka Ktable 查询
Kafka Ktable querying
正在尝试通过 KTable
从 "connect-configs" 主题中获取记录
public static void main(String... args) throws InterruptedException {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test_connect-configs_12");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Bytes().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Object> ktable = builder.table("connect-configs");
KafkaStreams streams = new KafkaStreams(builder.build(),config);
streams.cleanUp();
streams.start();
System.out.println(ktable.queryableStoreName());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
ReadOnlyKeyValueStore<String, Object> view;
while (true) {
try {
System.out.println(ktable.queryableStoreName());
view = streams.store(ktable.queryableStoreName(), QueryableStoreTypes.keyValueStore());
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
};
ktable.queryableStoreName() 始终为空。为什么哪里没有查询的store?我看到像 "test_connect-configs_12-connect-configsSTATE-STORE-0000000000-changelog" 这样的主题。如何读取记录,如何获取KTable状态的变化事件?
您需要在创建 KTable
时为基础商店指定一个名称:
builder.table("connect-configs", Materialized.as<...>("my-store-name"));
正在尝试通过 KTable
从 "connect-configs" 主题中获取记录 public static void main(String... args) throws InterruptedException {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test_connect-configs_12");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Bytes().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Object> ktable = builder.table("connect-configs");
KafkaStreams streams = new KafkaStreams(builder.build(),config);
streams.cleanUp();
streams.start();
System.out.println(ktable.queryableStoreName());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
ReadOnlyKeyValueStore<String, Object> view;
while (true) {
try {
System.out.println(ktable.queryableStoreName());
view = streams.store(ktable.queryableStoreName(), QueryableStoreTypes.keyValueStore());
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
};
ktable.queryableStoreName() 始终为空。为什么哪里没有查询的store?我看到像 "test_connect-configs_12-connect-configsSTATE-STORE-0000000000-changelog" 这样的主题。如何读取记录,如何获取KTable状态的变化事件?
您需要在创建 KTable
时为基础商店指定一个名称:
builder.table("connect-configs", Materialized.as<...>("my-store-name"));