Kafka Streams - 是否可以在没有本地 Kafka Streams 实例的情况下 运行 远程交互式查询
Kafka Streams - Is it possible to run remote interactive queries without a local Kafka Streams instance
我有几个 Kafka Streams 应用程序实例想远程查询。
所有实例当前都在侦听指定的 host:port 对,每个实例都能够查询自己的本地状态存储并通过 REST 服务传递这些值。
+------------------+ +------------------+ +------------------+
| | | | | |
| instance1:9191 | | instance2:9292 | | instance3:9393 |
| | | | | |
+------------------+ +------------------+ +------------------+
我想要一个单独的应用程序能够在这些情况下查询状态存储:
consumer group separate application
+---------------------------------------+ _____
| [instance1] [instance2] [instance3] | <~------- | app |
+---------------------------------------+ -----
单独的应用程序将利用 StreamsMetadataState::getAllMetadataForStore
中的相同逻辑为我的应用程序 运行ning 实例获取所有活动的 host/port 对,运行通过 REST 服务进行远程查询,并在其自身的应用程序逻辑中聚合数据。
但是,我很难实现它。由于 host/port 对似乎是通过消费者组协议进行通信的,因此看起来我需要实际实例化另一个 Kafka Streams 实例(即消费者组的另一个成员)以利用远程交互式查询.
我的问题是:
- 是否可以在没有 运行ning 本地 Kafka 的情况下为应用程序的所有 运行ning 实例找到 host/value 对同一个消费者组中的流实例? (我突出显示 运行ning 因为我不介意实例化 Kafka Streams 应用程序的虚拟实例只是为了获取 host/port 元数据,但是有一个
validateRunning
检查是否阻止我这样做)
- 上述设计是否存在问题(运行使用单独的应用程序来查询 Kafka Streams 应用程序的所有实例)?即,也许我正在谈论的行为不受支持,因为我正在做的事情有我尚未考虑的后果?
似乎应该有一个静态方法来获取状态存储元数据,它允许我们直接传递从构建器对象中提取的任何值。即
KafkaStreams::getMetaDataForStore(streamsConfig, storeName);
没有 API 支持。
推荐的模式是,向所有实例添加第二个 RPC(与 IQ 端口不同的端口,我们称之为 bootstrap-端口)。因此,知道一对host/bootstrap-port对就足以得到所有IQhost/port对的信息。
更新
您还可以使用 Kafka 主题传播 host/port 信息。每个实例在启动时将其 host/port 写入主题,您可以从那里读取此信息。棘手的部分是从主题到 expire/delete。如果您启用压缩,每个实例都可以为其 host/port 消息写入一个墓碑消息。但是,如果实例崩溃,您将获得未删除的旧信息。
另一方面,您可以将此方法与第一种方法结合起来(即,在主题中写 host/bootstrap-port 而不是 IQ host/port)。获得 one 有效 host/bootstrap-port 就足够了——如果你使用无效的,你的请求就会超时,你可以从你的查询应用程序中写一个墓碑来清理。
结束更新
如果这也不起作用,您可以"hack"绕过这个限制。成功重新平衡后,所有元数据都存储在主题 __consumer_offsets
中,理论上您可以从那里读取信息并提取所有 host/port 对。但是,您将依赖内部实现细节,因此您的代码可能会在升级时中断。
- Is it possible to find the host/value pairs for all running instances of an application without also running a local Kafka Streams instance in the same consumer group? (I highlight running because I don't mind instantiating a dummy instance of the Kafka Streams app just to get the host/port meta data, but there is a validateRunning check that prevents me from doing so)
为什么不向您的(第一个)Kafka Streams 应用程序添加一个新的 REST API 方法,将当前活动的 host/port 对公开给您的第二个应用程序?应用程序实例当然会随时提供此信息。
第二个应用程序 -- "the separate app" -- 然后可以通过此 REST 方法查询任何(第一个)Kafka Streams 应用程序实例,以发现所有 运行ning 应用程序实例。在这里,您根本不需要 运行 第二个应用程序中的虚拟 KafkaStreams
实例。
- Are there problems with the above design (running a separate app to query all instances of a Kafka Streams app)? i.e. maybe the behavior I'm talking about isn't supported because what I'm doing has ramifications that I haven't considered yet?
见上文。没有什么能阻止您向 Kafka Streams 应用程序的 REST 层添加更多方法。毕竟,您的(第一个)应用程序中使用 Kafka Streams API 的部分不需要是该应用程序的唯一部分。 :-) 我认为你的问题可能是你的想法有点 "boxed in" 在你觉得有义务通过 Kafka Streams 在你的应用程序中做 一切 API——但事实并非如此。毕竟,Kafka Streams API 背后的动机之一是让您将其与您希望在应用程序中利用的其他 API 和库混合使用。
我有几个 Kafka Streams 应用程序实例想远程查询。
所有实例当前都在侦听指定的 host:port 对,每个实例都能够查询自己的本地状态存储并通过 REST 服务传递这些值。
+------------------+ +------------------+ +------------------+
| | | | | |
| instance1:9191 | | instance2:9292 | | instance3:9393 |
| | | | | |
+------------------+ +------------------+ +------------------+
我想要一个单独的应用程序能够在这些情况下查询状态存储:
consumer group separate application
+---------------------------------------+ _____
| [instance1] [instance2] [instance3] | <~------- | app |
+---------------------------------------+ -----
单独的应用程序将利用 StreamsMetadataState::getAllMetadataForStore
中的相同逻辑为我的应用程序 运行ning 实例获取所有活动的 host/port 对,运行通过 REST 服务进行远程查询,并在其自身的应用程序逻辑中聚合数据。
但是,我很难实现它。由于 host/port 对似乎是通过消费者组协议进行通信的,因此看起来我需要实际实例化另一个 Kafka Streams 实例(即消费者组的另一个成员)以利用远程交互式查询.
我的问题是:
- 是否可以在没有 运行ning 本地 Kafka 的情况下为应用程序的所有 运行ning 实例找到 host/value 对同一个消费者组中的流实例? (我突出显示 运行ning 因为我不介意实例化 Kafka Streams 应用程序的虚拟实例只是为了获取 host/port 元数据,但是有一个
validateRunning
检查是否阻止我这样做) - 上述设计是否存在问题(运行使用单独的应用程序来查询 Kafka Streams 应用程序的所有实例)?即,也许我正在谈论的行为不受支持,因为我正在做的事情有我尚未考虑的后果?
似乎应该有一个静态方法来获取状态存储元数据,它允许我们直接传递从构建器对象中提取的任何值。即
KafkaStreams::getMetaDataForStore(streamsConfig, storeName);
没有 API 支持。
推荐的模式是,向所有实例添加第二个 RPC(与 IQ 端口不同的端口,我们称之为 bootstrap-端口)。因此,知道一对host/bootstrap-port对就足以得到所有IQhost/port对的信息。
更新
您还可以使用 Kafka 主题传播 host/port 信息。每个实例在启动时将其 host/port 写入主题,您可以从那里读取此信息。棘手的部分是从主题到 expire/delete。如果您启用压缩,每个实例都可以为其 host/port 消息写入一个墓碑消息。但是,如果实例崩溃,您将获得未删除的旧信息。
另一方面,您可以将此方法与第一种方法结合起来(即,在主题中写 host/bootstrap-port 而不是 IQ host/port)。获得 one 有效 host/bootstrap-port 就足够了——如果你使用无效的,你的请求就会超时,你可以从你的查询应用程序中写一个墓碑来清理。
结束更新
如果这也不起作用,您可以"hack"绕过这个限制。成功重新平衡后,所有元数据都存储在主题 __consumer_offsets
中,理论上您可以从那里读取信息并提取所有 host/port 对。但是,您将依赖内部实现细节,因此您的代码可能会在升级时中断。
- Is it possible to find the host/value pairs for all running instances of an application without also running a local Kafka Streams instance in the same consumer group? (I highlight running because I don't mind instantiating a dummy instance of the Kafka Streams app just to get the host/port meta data, but there is a validateRunning check that prevents me from doing so)
为什么不向您的(第一个)Kafka Streams 应用程序添加一个新的 REST API 方法,将当前活动的 host/port 对公开给您的第二个应用程序?应用程序实例当然会随时提供此信息。
第二个应用程序 -- "the separate app" -- 然后可以通过此 REST 方法查询任何(第一个)Kafka Streams 应用程序实例,以发现所有 运行ning 应用程序实例。在这里,您根本不需要 运行 第二个应用程序中的虚拟 KafkaStreams
实例。
- Are there problems with the above design (running a separate app to query all instances of a Kafka Streams app)? i.e. maybe the behavior I'm talking about isn't supported because what I'm doing has ramifications that I haven't considered yet?
见上文。没有什么能阻止您向 Kafka Streams 应用程序的 REST 层添加更多方法。毕竟,您的(第一个)应用程序中使用 Kafka Streams API 的部分不需要是该应用程序的唯一部分。 :-) 我认为你的问题可能是你的想法有点 "boxed in" 在你觉得有义务通过 Kafka Streams 在你的应用程序中做 一切 API——但事实并非如此。毕竟,Kafka Streams API 背后的动机之一是让您将其与您希望在应用程序中利用的其他 API 和库混合使用。