Kafka 状态存储在分布式环境中不可用

Kafka state store not available in distributed environment

我有一个具有以下版本的业务应用程序

我们有大约 20 个 batches.Each 批次使用 6-7 个主题来处理 business.Each 服务有自己的状态存储来维护批次的状态,无论其是否 running/Idle。 使用以下代码查询商店

@Autowired
private InteractiveQueryService interactiveQueryService;
      public ReadOnlyKeyValueStore<String, String> fetchKeyValueStoreBy(String storeName) {
        while (true) {
            try {
                log.info("Waiting for state store");
                return new ReadOnlyKeyValueStoreWrapper<>(interactiveQueryService.getQueryableStore(storeName,
                        QueryableStoreTypes.<String, String> keyValueStore()));
            } catch (final IllegalStateException e) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }

在一个实例(Linux 机器)中部署应用程序时,一切正常 fine.While 在 2 个实例中部署应用程序,我们发现以下观察结果

  1. state store 在一个实例中可用,而其他实例中没有。

  2. 当具有状态存储的实例正在处理请求时,一切都很好。

  3. 如果请求落在没有状态存储的实例上,应用程序将在 while 循环中无限等待(上面的代码片段)。
  4. 虽然没有存储的实例无限期地等待,如果我们杀死另一个实例,上面的代码 returns 存储并且它处理得很好。

不知道我们错过了什么。

当您有多个 Kafka Streams 处理器 运行 交互式查询时,上面显示的代码将无法按您预期的方式工作。如果您查询的键在同一台服务器上,它只会产生 returns 个结果。为了解决这个问题,您需要在每个实例上添加 属性 - spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>。确保在每台服务器上将服务器和端口更改为正确的。然后你要写类似下面的代码:

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
                        key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

请参阅 reference docs 了解更多信息。 这是一个证明这一点的 sample code