是否可以在程序运行时动态调整kafka流的num.stream.threads配置?

Is it possible to dynamically adjust the num.stream.threads configuration of kafka stream while the program is running?

我在一个服务中 运行 多个 kafka 流实例。我想在程序为运行时动态调整num.stream.threads配置来控制每个实例的优先级。

我在KafkaStreamclass上没有找到相关方法。

请问有没有其他办法?

无法在运行时更新已创建的 KafkaStreams 配置(它不仅与 属性 num.stream.threads 相关,还与其他人相关)。

作为一种解决方法,您可以通过停止现有流并创建和启动新流而不停止其他流和重新启动应用程序来重新创建特定的 KafkaStreams。它是否符合您的需求取决于您的具体用例。

这可以通过多种选择来实现。其中之一-根据特定的kafka流更新数据库中的配置(如num.stream.threads),并从应用程序的每个实例中从数据库中获取数据(例如,每10分钟通过cron表达式),如果发现任何更新-停止存在并启动一个具有所需更新配置的新 KafkaStream。如果您只有一个应用程序实例,则可以通过 REST 轻松实现。


更新自 kafka-streams 2.8.0

kafka-streams 2.8.0 以来,您可以在运行时添加和删除流线程,而无需重新创建流 (API to Start and Shut Down Stream Threads)

kafkaStreams.addStreamThread();
kafkaStreams.removeStreamThread();

这目前是不可能的。

如果要更改线程数,需要使用 KafkaStreams#close() 停止程序,使用更新的配置创建新的 KafkaStreams 实例,然后使用 [=12= 启动新实例].