Kafka 状态存储在分布式环境中不可用
Kafka state store not available in distributed environment
我有一个具有以下版本的业务应用程序
- spring boot(2.2.0.RELEASE) spring-Kafka(2.3.1-RELEASE)
spring-cloud-stream-binder-kafka(2.2.1-RELEASE)
spring-cloud-stream-binder-kafka-core(3.0.3-RELEASE)
spring-cloud-stream-binder-kafka-streams(3.0.3-RELEASE)
我们有大约 20 个 batches.Each 批次使用 6-7 个主题来处理 business.Each 服务有自己的状态存储来维护批次的状态,无论其是否 running/Idle。
使用以下代码查询商店
@Autowired
private InteractiveQueryService interactiveQueryService;
public ReadOnlyKeyValueStore<String, String> fetchKeyValueStoreBy(String storeName) {
while (true) {
try {
log.info("Waiting for state store");
return new ReadOnlyKeyValueStoreWrapper<>(interactiveQueryService.getQueryableStore(storeName,
QueryableStoreTypes.<String, String> keyValueStore()));
} catch (final IllegalStateException e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
在一个实例(Linux 机器)中部署应用程序时,一切正常 fine.While 在 2 个实例中部署应用程序,我们发现以下观察结果
state store 在一个实例中可用,而其他实例中没有。
当具有状态存储的实例正在处理请求时,一切都很好。
- 如果请求落在没有状态存储的实例上,应用程序将在 while 循环中无限等待(上面的代码片段)。
- 虽然没有存储的实例无限期地等待,如果我们杀死另一个实例,上面的代码 returns 存储并且它处理得很好。
不知道我们错过了什么。
当您有多个 Kafka Streams 处理器 运行 交互式查询时,上面显示的代码将无法按您预期的方式工作。如果您查询的键在同一台服务器上,它只会产生 returns 个结果。为了解决这个问题,您需要在每个实例上添加 属性 - spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
。确保在每台服务器上将服务器和端口更改为正确的。然后你要写类似下面的代码:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
请参阅 reference docs 了解更多信息。
这是一个证明这一点的 sample code。
我有一个具有以下版本的业务应用程序
- spring boot(2.2.0.RELEASE) spring-Kafka(2.3.1-RELEASE)
spring-cloud-stream-binder-kafka(2.2.1-RELEASE)
spring-cloud-stream-binder-kafka-core(3.0.3-RELEASE)
spring-cloud-stream-binder-kafka-streams(3.0.3-RELEASE)
我们有大约 20 个 batches.Each 批次使用 6-7 个主题来处理 business.Each 服务有自己的状态存储来维护批次的状态,无论其是否 running/Idle。 使用以下代码查询商店
@Autowired
private InteractiveQueryService interactiveQueryService;
public ReadOnlyKeyValueStore<String, String> fetchKeyValueStoreBy(String storeName) {
while (true) {
try {
log.info("Waiting for state store");
return new ReadOnlyKeyValueStoreWrapper<>(interactiveQueryService.getQueryableStore(storeName,
QueryableStoreTypes.<String, String> keyValueStore()));
} catch (final IllegalStateException e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
在一个实例(Linux 机器)中部署应用程序时,一切正常 fine.While 在 2 个实例中部署应用程序,我们发现以下观察结果
state store 在一个实例中可用,而其他实例中没有。
当具有状态存储的实例正在处理请求时,一切都很好。
- 如果请求落在没有状态存储的实例上,应用程序将在 while 循环中无限等待(上面的代码片段)。
- 虽然没有存储的实例无限期地等待,如果我们杀死另一个实例,上面的代码 returns 存储并且它处理得很好。
不知道我们错过了什么。
当您有多个 Kafka Streams 处理器 运行 交互式查询时,上面显示的代码将无法按您预期的方式工作。如果您查询的键在同一台服务器上,它只会产生 returns 个结果。为了解决这个问题,您需要在每个实例上添加 属性 - spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
。确保在每台服务器上将服务器和端口更改为正确的。然后你要写类似下面的代码:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
请参阅 reference docs 了解更多信息。 这是一个证明这一点的 sample code。