Kafka Streams - 是否可以在没有本地 Kafka Streams 实例的情况下 运行 远程交互式查询

Kafka Streams - Is it possible to run remote interactive queries without a local Kafka Streams instance

我有几个 Kafka Streams 应用程序实例想远程查询。

所有实例当前都在侦听指定的 host:port 对,每个实例都能够查询自己的本地状态存储并通过 REST 服务传递这些值。

+------------------+  +------------------+  +------------------+
|                  |  |                  |  |                  |
|  instance1:9191  |  |  instance2:9292  |  |  instance3:9393  |
|                  |  |                  |  |                  |
+------------------+  +------------------+  +------------------+

我想要一个单独的应用程序能够在这些情况下查询状态存储:

             consumer group                         separate application
+---------------------------------------+              _____
|   [instance1] [instance2] [instance3] |  <~-------  | app |
+---------------------------------------+              -----

单独的应用程序将利用 StreamsMetadataState::getAllMetadataForStore 中的相同逻辑为我的应用程序 运行ning 实例获取所有活动的 host/port 对,运行通过 REST 服务进行远程查询,并在其自身的应用程序逻辑中聚合数据。

但是,我很难实现它。由于 host/port 对似乎是通过消费者组协议进行通信的,因此看起来我需要实际实例化另一个 Kafka Streams 实例(即消费者组的另一个成员)以利用远程交互式查询.

我的问题是:

似乎应该有一个静态方法来获取状态存储元数据,它允许我们直接传递从构建器对象中提取的任何值。即

KafkaStreams::getMetaDataForStore(streamsConfig, storeName);

没有 API 支持。

推荐的模式是,向所有实例添加第二个 RPC(与 IQ 端口不同的端口,我们称之为 bootstrap-端口)。因此,知道一对host/bootstrap-port对就足以得到所有IQhost/port对的信息。

更新

您还可以使用 Kafka 主题传播 host/port 信息。每个实例在启动时将其 host/port 写入主题,您可以从那里读取此信息。棘手的部分是从主题到 expire/delete。如果您启用压缩,每个实例都可以为其 host/port 消息写入一个墓碑消息。但是,如果实例崩溃,您将获得未删除的旧信息。

另一方面,您可以将此方法与第一种方法结合起来(即,在主题中写 host/bootstrap-port 而不是 IQ host/port)。获得 one 有效 host/bootstrap-port 就足够了——如果你使用无效的,你的请求就会超时,你可以从你的查询应用程序中写一个墓碑来清理。

结束更新

如果这也不起作用,您可以"hack"绕过这个限制。成功重新平衡后,所有元数据都存储在主题 __consumer_offsets 中,理论上您可以从那里读取信息并提取所有 host/port 对。但是,您将依赖内部实现细节,因此您的代码可能会在升级时中断。

  • Is it possible to find the host/value pairs for all running instances of an application without also running a local Kafka Streams instance in the same consumer group? (I highlight running because I don't mind instantiating a dummy instance of the Kafka Streams app just to get the host/port meta data, but there is a validateRunning check that prevents me from doing so)

为什么不向您的(第一个)Kafka Streams 应用程序添加一个新的 REST API 方法,将当前活动的 host/port 对公开给您的第二个应用程序?应用程序实例当然会随时提供此信息。

第二个应用程序 -- "the separate app" -- 然后可以通过此 REST 方法查询任何(第一个)Kafka Streams 应用程序实例,以发现所有 运行ning 应用程序实例。在这里,您根本不需要 运行 第二个应用程序中的虚拟 KafkaStreams 实例。

  • Are there problems with the above design (running a separate app to query all instances of a Kafka Streams app)? i.e. maybe the behavior I'm talking about isn't supported because what I'm doing has ramifications that I haven't considered yet?

见上文。没有什么能阻止您向 Kafka Streams 应用程序的 REST 层添加更多方法。毕竟,您的(第一个)应用程序中使用 Kafka Streams API 的部分不需要是该应用程序的唯一部分。 :-) 我认为你的问题可能是你的想法有点 "boxed in" 在你觉得有义务通过 Kafka Streams 在你的应用程序中做 一切 API——但事实并非如此。毕竟,Kafka Streams API 背后的动机之一是让您将其与您希望在应用程序中利用的其他 API 和库混合使用。