在 Kafka Streams 应用程序中保持本地状态存储更新
Keeping the local state store updated in a Kafka Streams app
我在 Kafka 中有一个日志压缩主题,我正在 KTable 中读取它并创建一个商店。每当我更新键控消息时,当应用程序为 运行 时,我看不到最新状态。但是当我重新启动流应用程序时,我能够看到更新后的状态。如何在流应用程序 运行 不重启的情况下获取更新状态。
这是我用来迭代状态值的代码(在主函数中)。
final String queryableStore = metaTable.queryableStoreName();
ReadOnlyKeyValueStore<String,String> metaTableView = null;
try {
metaTableView = waitUntilStoreIsQueryable(queryableStore,QueryableStoreTypes.keyValueStore(),streams);
} catch (InterruptedException e) {
e.printStackTrace();
}
metaTableView.all().forEachRemaining(row -> logger.info("Key: "+row.key+" Value: "+row.value));
不确定您如何 运行 代码。但是如果我理解正确的话,你期望 logger.info
打印每个更新?如果是这种情况,那么这将不起作用,因为 .all()
遍历 KTable 一次并在之后终止。请注意,您在设计用于一次性查找的代码示例中使用 "Interactive Queries"。您需要在自己的代码中重新运行 查询——当然,您每次都会得到 所有 条记录。
如果你想持续获取KTable更新(而且只是更新),你需要直接使用KTable。例如,你可以做
KTable table = builder.table(...);
table.toStream().foreach(...);
KTable 的每次更新都会产生一个由 foreach()
处理的输出记录。注意记录缓存;您可能希望禁用缓存以获取所有更新,否则,某些更新可能不会被转发,因为它们受缓存影响。
我在 Kafka 中有一个日志压缩主题,我正在 KTable 中读取它并创建一个商店。每当我更新键控消息时,当应用程序为 运行 时,我看不到最新状态。但是当我重新启动流应用程序时,我能够看到更新后的状态。如何在流应用程序 运行 不重启的情况下获取更新状态。
这是我用来迭代状态值的代码(在主函数中)。
final String queryableStore = metaTable.queryableStoreName();
ReadOnlyKeyValueStore<String,String> metaTableView = null;
try {
metaTableView = waitUntilStoreIsQueryable(queryableStore,QueryableStoreTypes.keyValueStore(),streams);
} catch (InterruptedException e) {
e.printStackTrace();
}
metaTableView.all().forEachRemaining(row -> logger.info("Key: "+row.key+" Value: "+row.value));
不确定您如何 运行 代码。但是如果我理解正确的话,你期望 logger.info
打印每个更新?如果是这种情况,那么这将不起作用,因为 .all()
遍历 KTable 一次并在之后终止。请注意,您在设计用于一次性查找的代码示例中使用 "Interactive Queries"。您需要在自己的代码中重新运行 查询——当然,您每次都会得到 所有 条记录。
如果你想持续获取KTable更新(而且只是更新),你需要直接使用KTable。例如,你可以做
KTable table = builder.table(...);
table.toStream().foreach(...);
KTable 的每次更新都会产生一个由 foreach()
处理的输出记录。注意记录缓存;您可能希望禁用缓存以获取所有更新,否则,某些更新可能不会被转发,因为它们受缓存影响。