Kafka Stream 自动读取新主题?
Kafka Stream automatically read from new topic?
有什么方法可以让我的 Kafka Stream 应用程序自动读取新创建的主题吗?
即使在流应用已经运行ning时创建主题?
像这样在主题名称中使用通配符:
KStream<String, String> rawText = builder.stream("topic-input-*");
为什么我需要这个?
现在,我有多个客户端向他们自己的主题发送数据(都具有相同的模式),我的流应用程序从这些主题中读取。然后我的应用程序进行一些转换并将结果写入 单个主题 .
虽然所有的客户都可以写同一个主题,但一个不守规矩的客户也可以代表其他人写。所以我为每个客户创建了单独的主题。问题是,每当有新客户到来时,我都会创建新主题并使用脚本为他们设置 ACL,但这还不够。我还必须停止我的流媒体应用程序,编辑代码,添加新主题,编译它,打包它,将它放在服务器上并再次 运行!
Kafka Streams 支持模式订阅:
builder.stream(Pattern.compile("topic-input-*"));
(我希望语法是正确的;从我的头顶不确定...但重点是,您可以使用 [=12= 的重载,而不是传递 String
] 采用模式的方法。)
有什么方法可以让我的 Kafka Stream 应用程序自动读取新创建的主题吗?
即使在流应用已经运行ning时创建主题?
像这样在主题名称中使用通配符:
KStream<String, String> rawText = builder.stream("topic-input-*");
为什么我需要这个?
现在,我有多个客户端向他们自己的主题发送数据(都具有相同的模式),我的流应用程序从这些主题中读取。然后我的应用程序进行一些转换并将结果写入 单个主题 .
虽然所有的客户都可以写同一个主题,但一个不守规矩的客户也可以代表其他人写。所以我为每个客户创建了单独的主题。问题是,每当有新客户到来时,我都会创建新主题并使用脚本为他们设置 ACL,但这还不够。我还必须停止我的流媒体应用程序,编辑代码,添加新主题,编译它,打包它,将它放在服务器上并再次 运行!
Kafka Streams 支持模式订阅:
builder.stream(Pattern.compile("topic-input-*"));
(我希望语法是正确的;从我的头顶不确定...但重点是,您可以使用 [=12= 的重载,而不是传递 String
] 采用模式的方法。)